Spring Boot中集成Redis与MySQL
1. 环境准备与依赖配置
1.1 Maven 依赖管理
为了在 Spring Boot 项目中使用 Redis 和 MySQL,我们需要在 pom.xml
中添加必要的依赖。主要包括以下几个依赖:
- Spring Data Redis:用于在 Spring Boot 中集成 Redis,提供 RedisTemplate 进行操作。
- MySQL JDBC 驱动:用于连接 MySQL 数据库。
- Spring Data JPA:用于简化与 MySQL 数据库的交互,提供面向对象的数据库操作支持。
具体依赖代码:
<!-- Spring Data Redis 依赖,用于集成 Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- MySQL JDBC 驱动,用于连接 MySQL 数据库 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- Spring Data JPA 依赖,用于简化数据库访问 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
添加完这些依赖后,Spring Boot 项目就可以利用 Spring Data Redis 来操作 Redis,并通过 Spring Data JPA 连接和操作 MySQL 数据库。
1.2. 配置文件:设置 application.properties
文件
Spring Boot 项目中通常使用 application.properties
文件来配置应用所需的参数。以下是 Redis 和 MySQL 的详细连接参数设置说明。
Redis 配置
# Redis 主机地址,默认情况下是 localhost,本地使用无需修改
spring.redis.host=localhost
# Redis 服务端口,默认 6379
spring.redis.port=6379
# Redis 密码,如果没有设置密码,可以留空
spring.redis.password=your_redis_password
配置说明:
spring.redis.host
:Redis 服务器的主机地址,通常为localhost
(即本地)或 Redis 所在的服务器 IP。spring.redis.port
:Redis 服务器的端口号,默认是6379
,可以根据实际 Redis 配置进行调整。spring.redis.password
:Redis 的访问密码。如果 Redis 设置了密码保护(通常用于远程访问或安全性较高的场景),则在此处填写对应的密码。
MySQL 配置
# MySQL 数据库连接 URL,格式为:jdbc:mysql://[host]:[port]/[database_name]?参数
spring.datasource.url=jdbc:mysql://localhost:3306/your_database_name?useSSL=false&serverTimezone=UTC
# MySQL 数据库用户名
spring.datasource.username=your_mysql_username
# MySQL 数据库用户密码
spring.datasource.password=your_mysql_password
# JDBC 驱动类名称,Spring Boot 2.x 及以上版本中使用 com.mysql.cj.jdbc.Driver
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
配置说明:
spring.datasource.url
:数据库连接 URL。其格式为:jdbc:mysql://
:指定使用 MySQL 数据库的 JDBC 驱动。[host]
:MySQL 服务器主机地址,本地为localhost
,如果是远程则填写 IP 地址。[port]
:MySQL 服务端口号,默认是3306
,可以根据实际情况调整。[database_name]
:具体使用的数据库名称,需要在 MySQL 中提前创建好。?useSSL=false
:指定是否启用 SSL 连接(一般本地开发设为false
)。&serverTimezone=UTC
:设置服务器的时区为 UTC,防止可能的时区问题。
spring.datasource.username
:MySQL 数据库用户名,用于连接数据库。spring.datasource.password
:对应的用户名密码,确保输入正确。spring.datasource.driver-class-name
:指定 JDBC 驱动类。MySQL 使用com.mysql.cj.jdbc.Driver
(这是 MySQL 8.0 及以上版本的驱动类)。
1.3. 本地与远程服务搭建
1.3.1. Redis 服务启动
(1)本地 Redis 启动
安装 Redis:首先,你需要在本地环境中安装 Redis。根据操作系统的不同,可以通过 apt-get
(Linux)、brew
(macOS)或直接下载 Redis 可执行文件来安装。
启动 Redis 服务:
- 安装完成后,可以通过命令行启动 Redis 服务:
redis-server
- Redis 默认在
localhost
的6379
端口上监听请求。 - 启动后,你可以通过 Redis 客户端(
redis-cli
)测试连接。在终端输入以下命令:
redis-cli
- 然后执行
PING
命令。如果返回PONG
,表示 Redis 已经正常启动。
(2)远程 Redis 服务器配置
安全性设置:如果 Redis 部署在远程服务器上,建议为 Redis 设置密码。在 Redis 的配置文件(通常是 redis.conf
)中设置 requirepass your_password
,然后重新启动服务以生效。
防火墙配置:确保 Redis 端口(默认 6379
)在服务器的防火墙中开放,以允许远程访问。
网络连接测试:
- 确保你的应用主机能够通过 IP 地址和端口访问远程 Redis 服务器,可以在命令行测试连接:
redis-cli -h <远程IP> -p 6379 -a <密码>
- 如果连接成功,执行
PING
返回PONG
,则说明网络连接和认证都已正确配置。
(3)Redis 的远程访问限制
- 为了安全起见,Redis 的配置文件中默认限制了公网访问。可以在
redis.conf
文件中设置bind 0.0.0.0
以允许所有 IP 连接 Redis。 - 也可以在同一文件中设置
protected-mode yes
以启用受保护模式,如果不打算开放公网访问,可以禁用远程访问。
1.3.2. MySQL 服务启动
(1)本地 MySQL 启动
安装 MySQL:根据操作系统不同,可以通过 apt-get
、yum
、brew
或下载安装包来安装 MySQL。
启动 MySQL 服务:
- 安装后,启动 MySQL 服务。在不同系统上启动 MySQL 的命令可能不同,如在 Linux 中:
sudo service mysql start
- 使用以下命令行测试 MySQL 是否正常启动并连接:
mysql -u root -p
- 输入密码后,如果成功进入 MySQL 命令行界面,表示 MySQL 已正常运行。
(2)远程 MySQL 服务器配置
1.配置远程访问权限:默认情况下,MySQL 只允许本地连接。要开放远程访问权限,你需要修改 MySQL 的用户权限。在 MySQL 控制台执行以下命令:
CREATE USER 'your_user'@'%' IDENTIFIED BY 'your_password';
GRANT ALL PRIVILEGES ON your_database.* TO 'your_user'@'%';
FLUSH PRIVILEGES;
2.配置文件修改:修改 MySQL 配置文件(通常是 my.cnf
或 my.ini
),找到 [mysqld]
配置块,设置 bind-address
为 0.0.0.0
以允许所有 IP 地址访问:
[mysqld]
bind-address = 0.0.0.0
重新启动 MySQL 使配置生效。
3.防火墙配置:确保服务器上开放 MySQL 的默认端口(3306),允许远程访问。
4.测试远程连接:
使用以下命令测试远程连接(在另一台主机上):
mysql -h <远程IP> -P 3306 -u your_user -p
如果成功连接,表示 MySQL 的远程访问配置正确。
1.3.3. 网络连通性和认证设置
网络连通性测试:从应用主机上测试到 Redis 和 MySQL 服务的连通性。可以使用 ping
或 telnet
命令检查 IP 和端口的连接状态:
# 测试 Redis 连接
telnet <redis_ip> 6379
# 测试 MySQL 连接
telnet <mysql_ip> 3306
如果连接成功说明网络连通性没有问题。
认证配置:
- 确保 Redis 和 MySQL 的认证信息(如用户名、密码)正确配置在 Spring Boot 项目中,并且可以成功访问。
- 对于 Redis,使用密码认证的配置项是
spring.redis.password
。 - 对于 MySQL,认证配置项包括
spring.datasource.username
和spring.datasource.password
。
2.Spring Boot 与 Redis 的集成
2.1. RedisTemplate 配置
RedisTemplate
是一个通用的模板类,适用于操作 Redis 中的多种数据结构,包括字符串、哈希、列表、集合等。我们可以通过 RedisTemplate
来方便地执行 Redis 的增删查改操作。为了确保数据的可读性和兼容性,我们通常需要自定义 RedisTemplate
的序列化配置。
2.1.1创建 RedisTemplate
Bean
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
// 创建 RedisTemplate 实例
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 设置连接工厂
template.setConnectionFactory(redisConnectionFactory);
// 配置 key 的序列化器
template.setKeySerializer(new StringRedisSerializer());
// 配置 value 的序列化器
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
// 配置 hash key 的序列化器
template.setHashKeySerializer(new StringRedisSerializer());
// 配置 hash value 的序列化器
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
// 初始化 RedisTemplate 配置
template.afterPropertiesSet();
return template;
}
}
配置解析
1.创建 RedisTemplate Bean:
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { ... }
- 使用
@Bean
注解,将RedisTemplate
注册为 Spring 容器的一个 Bean,使得应用的其他组件可以通过依赖注入@Autowired
来直接使用它。 - 泛型
<String, Object>
表示此RedisTemplate
的键类型为String
,值类型为Object
。这种配置较为通用,支持存储字符串和对象等多种类型。
2.设置连接工厂:
template.setConnectionFactory(redisConnectionFactory);
RedisConnectionFactory
是 Redis 连接的工厂接口,它负责管理 Redis 连接的创建和配置。- Spring Boot 通常会自动配置一个
RedisConnectionFactory
(基于 Lettuce 或 Jedis 的客户端)供我们使用。这里将连接工厂传给RedisTemplate
,确保其能够与 Redis 服务器建立连接。 - 没有配置连接工厂,
RedisTemplate
将无法连接到 Redis 服务,导致操作失败。
3.配置键的序列化器:
template.setKeySerializer(new StringRedisSerializer());
setKeySerializer
用于设置键(key)的序列化方式。StringRedisSerializer
是一种将键转换为字符串格式的序列化器,它将键序列化为 UTF-8 字符串,并以二进制的形式存储在 Redis 中。- 采用字符串序列化器使键在 Redis 中存储时具有可读性,例如你可以在 Redis CLI 中直接查看键的名称,这对于调试和维护非常有帮助。
4.配置值的序列化器:
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
setValueSerializer
用于设置值(value)的序列化方式。GenericJackson2JsonRedisSerializer
是一个 JSON 序列化器,使用 Jackson 库将对象序列化为 JSON 字符串存储。- 这样配置的好处是,值在 Redis 中会以 JSON 格式存储,不仅可读性强,而且便于与其他系统的数据交互。如果需要从 Redis 中获取复杂对象数据,
GenericJackson2JsonRedisSerializer
还支持反序列化,将 JSON 字符串转换回 Java 对象。
5.配置哈希键的序列化器:
template.setHashKeySerializer(new StringRedisSerializer());
setHashKeySerializer
设置哈希数据类型的键(hash key)的序列化器。- Redis 的哈希数据类型允许在一个键下存储多个字段(即子键和值对),这些字段可以独立操作。
StringRedisSerializer
将哈希键序列化为字符串,使得存储的哈希键具备可读性,方便查看。 - 例如在 Redis 中,你可以看到每个哈希键的具体内容,这在操作哈希类型时非常直观。
6.配置哈希值的序列化器:
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
setHashValueSerializer
设置哈希数据类型的值(hash value)的序列化器。- 我们使用
GenericJackson2JsonRedisSerializer
将哈希值序列化为 JSON 格式,以确保哈希类型的值在 Redis 中以 JSON 形式存储。 - JSON 格式不仅便于阅读,还支持复杂对象的存储,适合项目中有嵌套对象或自定义数据结构的需求。
7.初始化 RedisTemplate 配置:
template.afterPropertiesSet();
afterPropertiesSet()
方法用于初始化RedisTemplate
的配置。- 这个方法会检查
RedisTemplate
的各个属性是否已正确设置,确保RedisTemplate
就绪以供使用。这一步是 Spring 处理配置属性的通用做法,避免运行时出现未初始化的错误。
2.1.2.创建 StringRedisTemplate Bean
StringRedisTemplate
是 RedisTemplate
的一个变种,它专门用于存储字符串数据。由于它默认的键和值的序列化方式都是 StringRedisSerializer
,可以直接用于存储和读取字符串数据。
StringRedisTemplate 配置代码
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
@Configuration
public class RedisConfig {
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
return new StringRedisTemplate(redisConnectionFactory);
}
}
配置解析
StringRedisTemplate
继承自RedisTemplate
,主要用于操作 Redis 中的字符串类型数据。StringRedisTemplate
的键和值的序列化方式都是StringRedisSerializer
,这使得所有存储在 Redis 中的数据都是可读的字符串格式。- 适用于数据都是简单字符串的场景,由于
StringRedisTemplate
默认的序列化方式已满足大多数字符串操作的需求,不需要额外配置序列化器。 - 注册为 Spring Bean:同样地,
@Bean
注解将StringRedisTemplate
注册为 Spring 容器中的一个 Bean,其他类可以使用@Autowired
注入StringRedisTemplate
实例,进行字符串数据的操作。
2.1.3.常用序列化方式总结
-
StringRedisSerializer:
- 用途:将键或简单的值序列化为字符串。
- 适用场景:通常用于键的序列化,确保键在 Redis 中以字符串存储,以便于直接查看和管理。
-
GenericJackson2JsonRedisSerializer:
- 用途:将对象序列化为 JSON 格式的字符串,并支持 JSON 反序列化回对象。
- 适用场景:通常用于值的序列化,尤其是需要存储复杂对象的情况。它可以确保数据的可读性,且 JSON 格式数据跨系统兼容性好。
2.2.CRUD 操作方法
2.2.1.RedisTemplate 常用方法
通用方法
除了特定数据结构的 opsFor...()
方法,RedisTemplate
还提供了一些通用的 Redis 操作方法:
delete(String key)
:删除键key
及其对应的值,返回是否成功删除。expire(String key, long timeout, TimeUnit unit)
:设置键key
的过期时间。hasKey(String key)
:检查键key
是否存在,返回true
或false
。keys(String pattern)
:获取所有符合模式pattern
的键。persist(String key)
:移除键key
的过期时间,使其永久有效。rename(String oldKey, String newKey)
:将键oldKey
重命名为newKey
。type(String key)
:返回键key
的数据类型。
1. opsForValue()
- 操作字符串(String)类型的数据
opsForValue()
用于操作 Redis 中的字符串数据,适用于简单的键值对操作。
常用方法:
set(String key, Object value)
:将键key
的值设置为value
,如果键已存在,则覆盖旧值。set(String key, Object value, long timeout, TimeUnit unit)
:将键key
的值设置为value
,并指定过期时间timeout
和时间单位unit
。get(String key)
:获取键key
对应的值。increment(String key, long delta)
:将键key
的值增加delta
,返回增加后的值(适用于整数类型的值)。decrement(String key, long delta)
:将键key
的值减少delta
,返回减少后的值。
2. opsForHash()
- 操作哈希(Hash)类型的数据
opsForHash()
用于操作 Redis 中的哈希结构(类似于键值对集合),适合存储对象的多个字段或属性。
常用方法:
put(String key, Object hashKey, Object value)
:设置哈希表key
中字段hashKey
的值为value
。putAll(String key, Map<?, ?> map)
:将整个map
存入哈希表key
中。get(String key, Object hashKey)
:获取哈希表key
中字段hashKey
的值。delete(String key, Object... hashKeys)
:删除哈希表key
中的一个或多个字段。entries(String key)
:获取哈希表key
中的所有键值对,返回一个Map
。hasKey(String key, Object hashKey)
:判断哈希表key
中是否存在字段hashKey
。size(String key)
:获取哈希表key
的字段数量。
3. opsForList()
- 操作列表(List)类型的数据
opsForList()
用于操作 Redis 中的列表结构,列表数据类型可以存储一个有序的字符串列表(支持从左、右两端插入和弹出)。
常用方法:
leftPush(String key, Object value)
:将value
插入到列表key
的左侧。rightPush(String key, Object value)
:将value
插入到列表key
的右侧。leftPop(String key)
:移除并返回列表key
的左侧第一个元素。rightPop(String key)
:移除并返回列表key
的右侧第一个元素。range(String key, long start, long end)
:获取列表key
中从start
到end
范围内的元素。size(String key)
:获取列表key
的长度。set(String key, long index, Object value)
:将列表key
中指定索引index
的元素设置为value
。
4. opsForSet()
- 操作集合(Set)类型的数据
opsForSet()
用于操作 Redis 中的集合结构,集合中的元素是唯一的,且无序。
常用方法:
add(String key, Object... values)
:向集合key
中添加一个或多个values
,返回添加的元素数量。remove(String key, Object... values)
:从集合key
中移除一个或多个元素。members(String key)
:获取集合key
中的所有元素。isMember(String key, Object value)
:判断value
是否是集合key
的成员。size(String key)
:获取集合key
的元素个数。intersect(String key, String otherKey)
:返回集合key
和otherKey
的交集。difference(String key, String otherKey)
:返回集合key
与otherKey
的差集。union(String key, String otherKey)
:返回集合key
与otherKey
的并集。
5. opsForZSet()
- 操作有序集合(Sorted Set/ZSet)类型的数据
opsForZSet()
用于操作 Redis 中的有序集合结构。有序集合中的每个元素都关联一个分数,按分数从小到大排序。
常用方法:
add(String key, Object value, double score)
:将value
添加到有序集合key
中,并设置分数score
。remove(String key, Object... values)
:从有序集合key
中移除一个或多个元素。score(String key, Object value)
:获取有序集合key
中元素value
的分数。rank(String key, Object value)
:返回有序集合key
中元素value
的排名(从小到大)。reverseRank(String key, Object value)
:返回有序集合key
中元素value
的排名(从大到小)。range(String key, long start, long end)
:根据索引范围start
和end
获取有序集合key
中的元素。rangeByScore(String key, double min, double max)
:根据分数范围min
和max
获取有序集合key
中的元素。size(String key)
:获取有序集合key
的元素个数。
2.2.2. 创建 RedisService 类(编写CRUD 方法)
首先,我们创建一个服务类 RedisService
,在这个类中编写对 Redis 数据的 CRUD 操作方法。假设我们已经在 Spring Boot 中配置好了 RedisTemplate
,我们可以直接注入它来使用。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class RedisService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 创建或存储数据
public void saveData(String key, Object value) {
redisTemplate.opsForValue().set(key, value);
}
// 创建或存储带过期时间的数据
public void saveDataWithExpiration(String key, Object value, long timeout, TimeUnit unit) {
redisTemplate.opsForValue().set(key, value, timeout, unit);
}
// 读取数据
public Object getData(String key) {
return redisTemplate.opsForValue().get(key);
}
// 更新数据(在 Redis 中,更新实际上是直接覆盖旧值)
public void updateData(String key, Object newValue) {
redisTemplate.opsForValue().set(key, newValue); // 覆盖旧值
}
// 删除数据
public void deleteData(String key) {
redisTemplate.delete(key);
}
// 检查键是否存在
public boolean exists(String key) {
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
}
1. saveData
- 创建或存储数据
public void saveData(String key, Object value) {
redisTemplate.opsForValue().set(key, value);
}
- 作用:将键
key
的值设置为value
。如果键已存在,则覆盖旧值。 - 解释:
opsForValue().set(key, value)
是一个写入操作,使用字符串类型数据结构将value
存储在 Redis 中的key
下。 - 应用场景:适用于需要保存字符串、简单对象或 JSON 格式化对象的数据。
2. saveDataWithExpiration
- 创建或存储带过期时间的数据
public void saveDataWithExpiration(String key, Object value, long timeout, TimeUnit unit) {
redisTemplate.opsForValue().set(key, value, timeout, unit);
}
- 作用:将键
key
的值设置为value
,并指定过期时间。 - 解释:该方法将数据保存到 Redis 中,并设置键的过期时间。
timeout
表示生存时间,unit
表示时间单位(如TimeUnit.SECONDS
)。 - 应用场景:适用于存储有时效性的数据,例如用户会话、验证码等。
3. getData
- 读取数据
public Object getData(String key) {
return redisTemplate.opsForValue().get(key);
}
- 作用:根据键
key
获取 Redis 中存储的值。 - 解释:
opsForValue().get(key)
方法从 Redis 中读取key
的值。如果key
不存在,返回null
。 - 应用场景:适用于读取简单的键值数据,例如读取用户信息、会话信息等。
4. updateData
- 更新数据
public void updateData(String key, Object newValue) {
redisTemplate.opsForValue().set(key, newValue);
}
- 作用:更新键
key
的值为newValue
。 - 解释:Redis 没有直接的“更新”操作,使用
set
方法可以实现覆盖更新效果。如果key
存在,则直接覆盖旧值;如果key
不存在,则创建新数据。 - 应用场景:适用于需要修改 Redis 中现有数据的场景。
5. deleteData
- 删除数据
public void deleteData(String key) {
redisTemplate.delete(key);
}
- 作用:删除 Redis 中指定的键值对。
- 解释:
delete(key)
方法会将 Redis 中指定的key
删除,如果key
存在则成功删除,如果key
不存在则无影响。 - 应用场景:适用于需要清理缓存数据、移除不再需要的数据的场景。
6. exists
- 检查键是否存在
public boolean exists(String key) {
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
- 作用:检查键
key
是否存在于 Redis 中。 - 解释:
hasKey(key)
方法返回一个Boolean
,表示key
是否存在。此处使用Boolean.TRUE.equals(...)
来避免null
值的问题。 - 应用场景:适用于在执行操作前检查数据是否已存在,避免无效操作。
2.3.缓存配置
RedisTemplate组件用于直接操作 Redis 数据,RedisCacheManager组件用于管理 Spring Cache 注解的缓存。
在 Spring Boot 中,将 Redis 作为缓存管理器可以通过 RedisCacheManager
实现,并利用 Spring Cache 的抽象功能。通过这种方式,可以很方便地使用缓存注解(如 @Cacheable
、@CachePut
和 @CacheEvict
)来管理 Redis 缓存。
配置 Redis 作为缓存管理器
首先,我们需要创建一个配置类,用于设置 Redis 缓存的默认行为,例如键值的序列化方式和缓存条目的过期时间。
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.Duration;
@Configuration
@EnableCaching // 启用 Spring Cache 缓存功能
public class CacheConfig {
@Bean
public CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {
// 配置 Redis 缓存的默认设置
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30)) // 设置缓存过期时间为 30 分钟
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())) // 设置键的序列化器
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer())); // 设置值的序列化器
// 创建 RedisCacheManager 并应用配置
return RedisCacheManager.builder(redisConnectionFactory)
.cacheDefaults(config)
.build();
}
}
配置类代码详解
1. @Configuration
和 @EnableCaching
注解
@Configuration
@EnableCaching
@Configuration
:将此类标记为一个配置类,用于定义 Spring Bean 和配置项。@EnableCaching
:启用 Spring Cache 的缓存功能。启用后,Spring 会自动检测和管理带有缓存注解(如@Cacheable
、@CachePut
、@CacheEvict
)的方法,从而自动实现缓存的存储和读取。
2. 创建 CacheManager
Bean
@Bean
public CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {
-
CacheManager
是 Spring Cache 的核心组件,负责管理缓存的生命周期、存取和删除操作。我们在这里定义了一个CacheManager
Bean,利用RedisCacheManager
将 Redis 作为缓存的存储介质。 -
RedisConnectionFactory
是 Redis 的连接工厂,用于与 Redis 建立连接。在这里,我们将RedisConnectionFactory
注入到RedisCacheManager
中,确保CacheManager
可以正常连接 Redis 服务器。
3. 配置 Redis 缓存行为(RedisCacheConfiguration
)
在这部分代码中,我们定义了缓存的默认配置(如过期时间和序列化方式)。这些配置只影响通过 CacheManager
和缓存注解(如 @Cacheable
)进行的缓存操作,不会影响 RedisTemplate
。
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
-
RedisCacheConfiguration.defaultCacheConfig()
:RedisCacheConfiguration
是 Redis 缓存的配置类,用于定义缓存的行为。defaultCacheConfig()
方法获取一个默认的缓存配置对象。- 我们可以在这个配置对象的基础上定制缓存的行为,例如设置过期时间和序列化器。
-
entryTtl(Duration.ofMinutes(30))
:entryTtl
方法用于设置缓存条目的过期时间,即缓存的生存时间(TTL)。- 在这里,我们将缓存的过期时间设置为 30 分钟,这意味着缓存中的数据在存储 30 分钟后将自动失效。
- 通过设置合理的过期时间,可以确保缓存中的数据是较新的,有助于数据的一致性。
-
serializeKeysWith( ......StringRedisSerializer()))
:serializeKeysWith
方法用于设置缓存键(key)的序列化方式。StringRedisSerializer
是一个将数据序列化为字符串的序列化器,适用于缓存键的序列化。使用字符串格式存储键便于在 Redis 中查看缓存键名,方便管理和调试。
-
serializeValuesWith( ....... GenericJackson2JsonRedisSerializer()))
:serializeValuesWith
方法用于设置缓存值(value)的序列化方式。GenericJackson2JsonRedisSerializer
使用 JSON 格式将对象序列化为 JSON 字符串。JSON 格式具有良好的可读性和跨系统兼容性,非常适合存储复杂的对象数据。- 这样配置后,所有通过缓存注解存入 Redis 的缓存数据将以 JSON 格式存储,便于在 Redis 中查看缓存内容,同时支持反序列化为 Java 对象。
4. 创建 RedisCacheManager
实例
return RedisCacheManager.builder(redisConnectionFactory)
.cacheDefaults(config)
.build();
-
RedisCacheManager.builder(redisConnectionFactory)
:- 使用 Redis 连接工厂
redisConnectionFactory
创建一个RedisCacheManager
的构建器。 - 连接工厂为
RedisCacheManager
提供与 Redis 服务器的连接,使其能够将缓存数据存储到 Redis 中。
- 使用 Redis 连接工厂
-
.cacheDefaults(config)
:cacheDefaults
方法用于指定 Redis 缓存的默认配置。这里将上面定义的config
(包含序列化方式和过期时间)作为 Redis 缓存的默认配置。- 所有使用
RedisCacheManager
存储的缓存数据将遵循该默认配置,使缓存的行为和存储格式一致。
-
.build()
:- 最终调用
build()
方法创建RedisCacheManager
实例,并将其注册为CacheManager
Bean。 - 通过
RedisCacheManager
管理缓存后,所有使用缓存注解(如@Cacheable
)的缓存数据将存储在 Redis 中,并按照指定的配置进行序列化和过期管理。
- 最终调用
2.4.缓存注解使用
1. @Cacheable
注解
@Cacheable
注解用于将方法的返回值缓存起来。下次调用该方法时,如果参数相同,Spring 会直接从缓存中返回结果,而不再执行方法体。这可以减少对数据库或外部服务的重复访问,提升性能。
使用方法
@Cacheable(value = "userCache", key = "#id", unless = "#result == null")
public User getUserById(Long id) {
System.out.println("Executing getUserById for id: " + id);
return findUserInDatabase(id); // 模拟数据库查询
}
参数详解
-
value
:指定缓存区域的名称,相当于缓存的命名空间或前缀(例如userCache
)。所有userCache
缓存区的键都会以userCache::
开头。 -
key
:指定缓存的键。使用#参数名
来引用方法的参数,如#id
表示将id
参数作为缓存键的一部分。最终 Redis 中的键可能是userCache::1
(假设id=1
)。 -
unless
(可选):指定条件,当条件成立时不缓存结果。例如,unless = "#result == null"
表示如果返回值为null
,则不缓存。
应用场景
-
数据查询操作:适用于频繁查询的操作,比如从数据库中获取用户信息、商品详情等。这些数据变化不频繁,使用
@Cacheable
缓存可以大大减少数据库访问。 -
API 调用:对于一些耗时的第三方 API 请求,可以缓存其返回结果,避免频繁调用外部接口。
2. @CachePut
注解
@CachePut
注解用于更新缓存数据,与 @Cacheable
不同的是,@CachePut
每次都会执行方法体,并将返回值更新到缓存中。它主要用于在更新数据的同时刷新缓存。
使用方法
@CachePut(value = "userCache", key = "#user.id")
public User updateUser(User user) {
System.out.println("Executing updateUser for id: " + user.getId());
return updateUserInDatabase(user); // 模拟更新数据库
}
参数详解
-
value
:指定缓存区域名称,通常为与查询操作相同的命名空间(例如userCache
)。 -
key
:指定缓存的键,通常是方法参数的某个属性,如#user.id
表示将user
对象的id
作为缓存键。
应用场景
-
数据更新操作:适用于更新数据库信息的场景。在数据库更新成功后,
@CachePut
会自动更新缓存中的数据,保证缓存和数据库的数据一致性。 -
缓存同步:
@CachePut
保证每次都更新缓存,适合在需要同步缓存与数据库的情况下使用。
3. @CacheEvict
注解
@CacheEvict
注解用于清除缓存中的数据。可以指定删除特定的缓存项,也可以通过配置 allEntries
参数清空整个缓存区域。
使用方法
@CacheEvict(value = "userCache", key = "#id", beforeInvocation = true)
public void deleteUserById(Long id) {
System.out.println("Executing deleteUserById for id: " + id);
deleteUserFromDatabase(id); // 模拟从数据库中删除用户
}
参数详解
-
value
:指定缓存区域的名称(如userCache
)。 -
key
:指定要清除的缓存项的键。例如,key = "#id"
表示删除userCache
中以id
为键的缓存项。 -
allEntries
(可选):当设置为true
时,清除整个缓存区域内的所有缓存项。默认为false
。 -
beforeInvocation
(可选):当设置为true
时,在方法执行前清除缓存。默认值是false
,即方法成功执行后再清除缓存。这在方法可能抛出异常时非常有用,避免方法失败导致缓存不一致。
应用场景
-
数据删除操作:适用于在从数据库删除数据时,确保清除对应的缓存项。
-
清空缓存:
allEntries = true
时,适用于清空整个缓存区域。例如,定期清空用户缓存区中的所有数据,以确保数据的时效性。 -
防止缓存不一致:
beforeInvocation = true
可以在方法执行前清除缓存,适合可能因异常导致数据不一致的场景。
4.综合示例:业务逻辑层
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
@Service
public class UserService {
// 查询用户信息并缓存结果
@Cacheable(value = "userCache", key = "#id", unless = "#result == null")
public User getUserById(Long id) {
System.out.println("Fetching user from database, id: " + id);
return findUserInDatabase(id); // 模拟数据库查询
}
// 更新用户信息并刷新缓存
@CachePut(value = "userCache", key = "#user.id")
public User updateUser(User user) {
System.out.println("Updating user in database, id: " + user.getId());
return updateUserInDatabase(user); // 模拟数据库更新
}
// 删除用户信息并清除对应缓存
@CacheEvict(value = "userCache", key = "#id")
public void deleteUserById(Long id) {
System.out.println("Deleting user from database, id: " + id);
deleteUserFromDatabase(id); // 模拟数据库删除
}
// 清空整个 userCache 缓存区域
@CacheEvict(value = "userCache", allEntries = true)
public void clearAllUserCache() {
System.out.println("Clearing all entries in userCache");
}
// 模拟数据库操作
private User findUserInDatabase(Long id) {
return new User(id, "User" + id); // 返回一个模拟的用户对象
}
private User updateUserInDatabase(User user) {
return user; // 返回更新后的用户
}
private void deleteUserFromDatabase(Long id) {
// 模拟删除操作
}
}
UserService
使用场景:是面向应用的业务服务层,专注于处理用户相关的核心业务逻辑,同时结合简单的缓存策略,如缓存数据的查询、更新和删除等。 当你希望业务逻辑代码中自动处理缓存逻辑时,可以使用UserService
和相关的缓存注解。它适用于绝大多数需要缓存的业务场景,尤其是在缓存操作较为简单的情况下。RedisService
专注于提供对缓存的直接操作,例如存取、更新、删除、批量操作等。它将缓存的操作与核心业务逻辑分离,提供更灵活的缓存管理功能。在一些高级缓存操作中,可能涉及到一些需要手动控制的缓存策略,比如缓存过期时间的设置、缓存清理、批量删除等。
3. Redis 和 MySQL 的协同缓存机制
3.1.缓存策略设计:哪些数据适合缓存
-
高频查询的数据:
- 频繁查询的数据非常适合缓存,例如用户资料、商品信息、热门文章等。
- 这些数据的访问频率高,通过缓存可以显著减少对数据库的访问,降低数据库负载,提高响应速度。
- 示例:在电商平台中,用户反复访问的商品详情可以放入缓存,这样在用户查看商品时,系统会优先从缓存读取数据。
-
不经常变化的数据:
- 那些变化频率低、数据更新较少的内容非常适合缓存。因为一旦缓存,这些数据可以在较长时间内有效,减少缓存失效的频率。
- 示例:像一些系统配置项、字典数据等,这些内容的更新较少,可以长期缓存。例如在用户管理系统中,用户权限的基本配置可以缓存。
-
聚合查询结果:
- 一些复杂的查询可能涉及到多张表和多个条件的组合查询,执行这些查询通常会耗费资源。对于这些复杂查询的结果,可以缓存到 Redis 中,避免每次都执行繁琐的数据库查询。
- 示例:如果在社交媒体平台中查询用户的好友推荐列表,这个列表可能涉及到多个条件和数据的关联,通过缓存查询结果可以提升查询效率。
-
实时性要求不高的数据:
- 对于一些实时性要求不高的数据,可以通过缓存来加速访问。例如,用户的浏览历史、日志信息等。
- 示例:在新闻类应用中,用户的阅读记录可以缓存,因为这些数据的更新对用户影响不大,也不会影响其他用户的体验。
选择缓存数据的关键考量
在选择数据缓存到 Redis 时,需要平衡以下几个方面:
- 数据访问的频率:高频访问的数据更适合缓存,因为缓存可以显著降低频繁访问对数据库的负担。
- 数据更新的频率:频繁更新的数据可能不适合缓存,除非有专门的机制保持缓存数据和源数据的一致性。
- 数据的大小和结构:缓存的大小需要考虑 Redis 的内存限制。对于特别大的数据或复杂的数据结构,缓存设计需要小心,以避免 Redis 内存不足。
3.2.读写分离
3.2.1. 读取操作:优先使用缓存,加速查询
在读取操作中,我们遵循以下流程:
- 查询 Redis 缓存:首先尝试从 Redis 获取数据。
- 缓存命中:如果 Redis 中存在目标数据,则直接返回。
- 缓存未命中:如果 Redis 中没有数据,则查询 MySQL,将查询结果存入 Redis 供后续使用。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class ProductService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ProductRepository productRepository; // 使用 JPA Repository 访问 MySQL
private static final String CACHE_PREFIX = "productCache::";
// 读取操作 - 查询商品信息
public Product getProductById(Long productId) {
// 1. 从缓存中获取数据
String cacheKey = CACHE_PREFIX + productId;
Product product = (Product) redisTemplate.opsForValue().get(cacheKey);
// 2. 如果缓存命中,直接返回
if (product != null) {
System.out.println("Cache hit for product ID: " + productId);
return product;
}
// 3. 缓存未命中,查询数据库
System.out.println("Cache miss for product ID: " + productId + ", querying database...");
product = productRepository.findById(productId).orElse(null);
// 4. 数据库有结果,将结果缓存,并设置过期时间
if (product != null) {
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);
System.out.println("Data cached for product ID: " + productId);
}
return product;
}
}
代码详解
-
缓存查询:代码通过
redisTemplate.opsForValue().get(cacheKey)
从 Redis 获取数据。cacheKey
是由CACHE_PREFIX
和productId
组成的字符串,如"productCache::1"
,用于唯一标识商品。 -
缓存命中:如果
product
不为null
,说明缓存中存在数据,打印日志提示“缓存命中”并直接返回数据。 -
缓存未命中:如果缓存中没有目标数据,则查询 MySQL 数据库获取商品信息。成功获取到商品数据后,使用
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS)
将数据缓存到 Redis 中,并设置过期时间为 1 小时。
实际效果
- 加速响应:缓存命中时,直接从 Redis 返回数据,避免了数据库查询,显著加速响应速度。
- 减少数据库压力:Redis 缓存可以减少数据库的重复访问,降低数据库负载。
3.2.2. 写入操作:更新 MySQL 并同步更新 Redis 缓存
写操作会直接影响到数据库和缓存的数据一致性。为了保证一致性,我们在写操作中进行以下步骤:
- 更新数据库:首先将数据更新到 MySQL 数据库中,确保持久化存储。
- 更新缓存:数据库更新成功后,将新数据写入 Redis 中同步更新缓存。
// 写操作 - 更新商品信息
public Product updateProduct(Product product) {
// 1. 更新数据库中的商品信息
product = productRepository.save(product);
System.out.println("Database updated for product ID: " + product.getId());
// 2. 更新缓存
String cacheKey = CACHE_PREFIX + product.getId();
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);
System.out.println("Cache updated for product ID: " + product.getId());
return product;
}
代码详解
-
更新数据库:代码通过
productRepository.save(product)
将商品信息更新到 MySQL 数据库,save
方法会插入或更新商品记录。 -
同步更新缓存:数据库更新成功后,将最新的商品数据写入 Redis 缓存中。通过
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS)
设置缓存键值对,并设定 1 小时的过期时间。这样,在下次读取时可以直接从缓存中获取最新数据,保证数据一致性。
实际效果
- 保持数据一致性:在更新数据库的同时更新缓存,确保缓存和数据库中的数据一致。
- 减少缓存失效风险:通过同步更新缓存避免了因缓存过期而产生的额外数据库查询。
3.2.3. 删除操作:清除数据库和缓存中的数据
当我们需要删除数据时,不仅要从数据库中删除数据,还要删除缓存中的对应数据,以确保缓存中不会再返回已删除的数据。
// 删除操作 - 删除商品信息
public void deleteProductById(Long productId) {
// 1. 从数据库删除商品信息
productRepository.deleteById(productId);
System.out.println("Database deleted for product ID: " + productId);
// 2. 删除缓存
String cacheKey = CACHE_PREFIX + productId;
redisTemplate.delete(cacheKey);
System.out.println("Cache deleted for product ID: " + productId);
}
代码详解
-
删除数据库记录:代码使用
productRepository.deleteById(productId)
从 MySQL 数据库中删除指定的商品记录。 -
删除缓存:数据库删除成功后,调用
redisTemplate.delete(cacheKey)
清除 Redis 中的缓存数据,避免读取到已被删除的数据。
实际效果
- 数据一致性:通过同步删除数据库和缓存中的数据,避免缓存和数据库的不一致。
- 释放缓存空间:清理无用数据,避免 Redis 中存储已删除的数据,节省内存资源。
3.3.缓存预热与过期策略
3.3.1.缓存预热(Cache Preheating)
1. 什么是缓存预热?
缓存预热是指在系统启动或高峰期到来之前,将一些热点数据提前加载到缓存中。这样可以避免系统启动时缓存为空导致的大量请求直接打到数据库,提升系统的响应速度,减少数据库压力。
2. 实现缓存预热的方式
方式一:使用 @PostConstruct
注解预热缓存
@PostConstruct
是一个 Java 标准注解,标注在方法上后,Spring 会在该 Bean 完成初始化后立即调用该方法。我们可以利用这个特性,在系统启动时自动将热点数据加载到缓存。
实现步骤和代码示例:
- 创建预热类:定义一个预热类,专门负责将热点数据加载到缓存中。
- 编写预热逻辑:在
@PostConstruct
方法中实现数据加载到缓存的逻辑。 - 配置缓存过期时间:在预热时为缓存数据设置合理的过期时间。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Component
public class CachePreheat {
@Autowired
private ProductService productService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String CACHE_PREFIX = "productCache::";
// 使用 @PostConstruct 注解使该方法在系统启动时自动执行
@PostConstruct
public void preheatCache() {
System.out.println("开始缓存预热...");
// 1. 获取需要预热的热点数据,例如热门商品列表
List<Product> hotProducts = productService.getHotProducts();
// 2. 将热点数据加载到缓存并设置过期时间
for (Product product : hotProducts) {
String cacheKey = CACHE_PREFIX + product.getId();
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS); // 设置缓存过期时间为1小时
}
System.out.println("缓存预热完成");
}
}
代码解释:
@Component
:将CachePreheat
类声明为 Spring 组件,使其被 Spring 容器管理。@PostConstruct
:让preheatCache()
方法在 Spring 容器初始化完成后自动执行,从而在系统启动时加载热点数据到缓存。productService.getHotProducts()
:调用ProductService
中的方法获取热点数据列表,例如用户常访问的热门商品。redisTemplate.opsForValue().set(...)
:将每条热点数据存入 Redis 缓存,并设置过期时间为 1 小时。
方式二:使用 @Scheduled
注解定时预热缓存
使用定时任务可以定期刷新缓存数据,确保热点数据持续更新。例如,每天凌晨重新加载热点数据,保持缓存数据的有效性。
示例代码:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Component
public class CachePreheatScheduler {
@Autowired
private ProductService productService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String CACHE_PREFIX = "productCache::";
// 每天凌晨2点执行缓存预热
@Scheduled(cron = "0 0 2 * * ?")
public void preheatCache() {
System.out.println("定时缓存预热开始...");
// 获取热点数据并存入缓存
List<Product> hotProducts = productService.getHotProducts();
for (Product product : hotProducts) {
String cacheKey = CACHE_PREFIX + product.getId();
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS); // 设置1小时的缓存过期时间
}
System.out.println("定时缓存预热完成");
}
}
代码解释:
@Scheduled(cron = "0 0 2 * * ?")
:每天凌晨 2 点执行缓存预热任务,cron
表达式控制执行时间。productService.getHotProducts()
:调用ProductService
获取热点数据。redisTemplate.opsForValue().set(...)
:将每条数据写入 Redis 缓存并设置过期时间,确保在一定时间内缓存数据是有效的。
3.3.2.缓存过期策略(Expiration Strategy)
缓存过期策略是指在缓存中设置数据的生命周期,以便在适当时间自动失效。合理的过期策略可以避免缓存和数据库之间的数据不一致,并减少缓存的内存占用。
1. 为什么需要缓存过期策略?
- 保证数据一致性:缓存中的数据和数据库数据可能会发生变化,设置缓存的过期时间可以有效减少这种不一致的时间。
- 释放内存资源:数据过期后自动释放缓存空间,避免缓存数据长期占用内存。
- 防止缓存雪崩:如果大量缓存项在同一时间过期,会导致系统突然压力增大,因此在设置过期时间时,可以引入随机化避免集中失效。
2. 缓存过期策略的具体实现
2.1 设置合理的 TTL(生存时间)
根据业务需求和数据的更新频率,选择合适的过期时间。对于变化频率高的数据,可以设置较短的 TTL;对于变化少的数据,可以设置较长的 TTL。
代码示例:
// 设置商品详情的缓存过期时间为1小时
String cacheKey = CACHE_PREFIX + product.getId();
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);
代码解释:
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS)
:缓存商品详情,并设置生存时间为 1 小时,确保缓存中的数据不会长期存在。- 这样可以有效控制缓存生命周期,使得缓存数据在 1 小时后失效,减少数据不一致的风险。
2.2 添加过期时间的随机化
如果大量缓存项设置了相同的过期时间,可能导致它们在同一时间失效,产生缓存雪崩。在基础过期时间上添加一个随机数,避免集中失效。
代码示例:
int baseExpireTime = 60 * 60; // 基础过期时间为1小时
int randomTime = new Random().nextInt(300); // 随机增加0~300秒
redisTemplate.opsForValue().set(cacheKey, product, baseExpireTime + randomTime, TimeUnit.SECONDS);
代码解释:
baseExpireTime = 60 * 60
:基础过期时间设为 1 小时。randomTime = new Random().nextInt(300)
:生成 0 到 300 秒之间的随机数。baseExpireTime + randomTime
:在基础过期时间上加一个随机值,防止大量缓存数据同时失效,减少缓存雪崩的风险。
3.3.3.避免数据不一致的策略
缓存与数据库的数据可能会出现不一致的情况,特别是在高并发场景中。为此,常用的策略包括“先更新数据库,后删除缓存”、延时双删策略和使用消息队列同步缓存等方法。
1. 先更新数据库,后删除缓存
这种方法的思路是,在数据更新时,先更新数据库,然后再删除缓存中的数据。这样可以确保缓存和数据库的一致性,因为数据库中的数据是最新的。
示例代码:
public Product updateProduct(Product product) {
// 1. 更新数据库
product = productRepository.save(product);
// 2. 删除缓存
String cacheKey = CACHE_PREFIX + product.getId();
redisTemplate.delete(cacheKey); // 删除缓存
return product;
}
代码解释:
productRepository.save(product)
:更新数据库记录,确保数据库数据是最新的。redisTemplate.delete(cacheKey)
:删除 Redis 缓存,确保缓存数据不再存在。
2. 延时双删策略
在高并发情况下,删除缓存后可能会有并发请求查询未更新的数据库,并将旧数据再次写入缓存,导致数据不一致。延时双删策略可以有效解决这个问题。
延时双删的实现步骤
- 更新数据库:先更新数据库中的数据。
- 第一次删除缓存:更新数据库后立即删除缓存。
- 延时操作:线程短暂休眠一段时间,以处理其他并发请求。
- 再次删除缓存:延时后再次删除缓存,确保缓存中的旧数据完全清除。
示例代码:
public Product updateProduct(Product product) {
// 1. 更新数据库
product = productRepository.save(product);
// 2. 删除缓存
String cacheKey = CACHE_PREFIX + product.getId();
redisTemplate.delete(cacheKey);
// 3. 休眠500毫秒
try {
Thread.sleep(500); // 等待可能的并发请求完成
} catch (InterruptedException e) {
e.printStackTrace();
}
// 4. 再次删除缓存
redisTemplate.delete(cacheKey);
return product;
}
代码解释:
productRepository.save(product)
:更新数据库数据。redisTemplate.delete(cacheKey)
:第一次删除缓存中的数据,防止读取到旧数据。Thread.sleep(500)
:延时 500 毫秒,确保数据库已更新,其他并发请求已完成。redisTemplate.delete(cacheKey)
:再次删除缓存,确保缓存不再存有旧数据。
3. 使用消息队列同步缓存
在复杂场景中,可以引入消息队列(如 RabbitMQ、Kafka 等),在数据库更新后发送缓存更新的消息,由消息队列异步同步缓存,确保缓存数据的一致性。
实现步骤和示例代码:
- 发送缓存更新消息:在数据库更新后,将需要更新的缓存键发送到消息队列中。
- 监听并处理消息:在需要同步缓存的服务中,监听消息队列,根据接收到的消息内容更新或删除缓存。
示例代码:
以下的示例代码是已经在Spring boot中集成了RabbitMQ之后的代码。
生产者:在数据库更新后发送缓存更新消息。
public Product updateProduct(Product product) {
// 更新数据库
product = productRepository.save(product);
// 发送缓存更新消息
cacheUpdateSender.sendCacheUpdateMessage(product.getId());
return product;
}
消费者:监听队列并更新或删除缓存。
@Component
public class CacheUpdateListener {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ProductRepository productRepository;
private static final String CACHE_PREFIX = "productCache::";
@RabbitListener(queues = "cacheUpdateQueue")
public void handleCacheUpdate(Long productId) {
// 获取最新的数据
Product product = productRepository.findById(productId).orElse(null);
// 更新缓存
if (product != null) {
String cacheKey = CACHE_PREFIX + productId;
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);
}
}
}
代码解释:
cacheUpdateSender.sendCacheUpdateMessage(product.getId())
:数据库更新后,发送缓存更新消息到消息队列。@RabbitListener(queues = "cacheUpdateQueue")
:监听消息队列,获取缓存更新请求。redisTemplate.opsForValue().set(...)
:接收到消息后更新 Redis 中的数据,并设置 1 小时的过期时间。
删除缓存后是否需要重新加载缓存?
在一些情况下,删除缓存后并不需要立即重新加载缓存,原因包括:
- 减少数据库压力:如果每次删除后立即重新加载,可能会增加数据库的查询压力。
- 热点数据自动回填:对于热点数据,当有用户访问时会自动回填缓存,因此可以根据访问需求动态更新。
3.4.缓存失效后清理缓存
存失效策略主要用于确保在数据更新或删除时,及时清除过期的缓存数据,避免缓存和数据库之间出现不一致。我们可以通过以下两种方式来实现缓存失效:
- 利用
@CacheEvict
注解:简化缓存失效操作,适合常规的缓存清理需求,包括更新、删除和批量清理。 - 手动清理缓存:使用 Redis API 手动删除缓存项,适合复杂业务需求下的精准控制,便于处理分布式缓存清理和条件清理的场景。
3.4.1.利用 @CacheEvict
注解实现缓存失效
@CacheEvict
是 Spring Cache 提供的注解,用于在方法执行后清除指定的缓存。它可以让我们在方法调用后自动清除缓存,不需要手动调用 Redis API,从而简化代码。
1. @CacheEvict
注解的常用参数
value
:指定缓存的名称(即命名空间)。缓存名称在 Redis 中通常作为缓存键的前缀。key
:指定要删除的缓存项的键,可以使用 SpEL 表达式(如#id
表示使用方法参数id
作为缓存键)。allEntries
:设置为true
时会清空指定缓存区域中的所有缓存项。默认为false
。beforeInvocation
:设置为true
时,缓存会在方法执行之前失效;默认为false
,即方法执行成功后才清除缓存。
2. 使用 @CacheEvict
实现缓存失效的场景
@CacheEvict
适合用于以下场景:
- 更新操作:当数据库中的数据被修改时,需要使缓存中的旧数据失效。
- 删除操作:当从数据库删除数据时,需要清除缓存中的对应数据,确保不会再访问到已删除的数据。
- 批量失效:有时需要清空整个缓存区域(如清空用户缓存),可以使用
allEntries=true
实现批量失效。
3. @CacheEvict
的具体使用
更新操作中使用 @CacheEvict
在更新操作中,可以使用 @CacheEvict
注解让缓存自动失效。假设我们有一个 ProductService
服务类,当更新产品信息时,我们希望自动清除该产品的缓存。
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.stereotype.Service;
@Service
public class ProductService {
@CacheEvict(value = "productCache", key = "#product.id")
public Product updateProduct(Product product) {
System.out.println("更新数据库中的产品信息");
// 这里假设执行数据库更新操作
return saveProductToDatabase(product); // 模拟数据库更新
}
private Product saveProductToDatabase(Product product) {
// 模拟数据库保存逻辑
return product;
}
}
代码解释:
@CacheEvict(value = "productCache", key = "#product.id")
:指定当updateProduct
方法执行后,清除 productCache 缓存区域中键为 product.id 的缓存项。saveProductToDatabase(product)
:更新数据库中的产品信息,确保数据库数据是最新的。- 效果:调用
updateProduct
方法时,缓存中旧的产品信息会自动失效,下次查询时会从数据库获取最新的数据。
删除操作中使用 @CacheEvict
在删除操作中,可以使用 @CacheEvict
注解让缓存中对应的数据自动失效。假设我们有一个删除产品的方法 deleteProductById
,希望删除数据库中的数据后清除缓存。
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.stereotype.Service;
@Service
public class ProductService {
@CacheEvict(value = "productCache", key = "#productId")
public void deleteProductById(Long productId) {
System.out.println("从数据库中删除产品信息,ID: " + productId);
// 这里假设执行数据库删除操作
deleteProductFromDatabase(productId); // 模拟数据库删除
}
private void deleteProductFromDatabase(Long productId) {
// 模拟数据库删除逻辑
}
}
代码解释:
@CacheEvict(value = "productCache", key = "#productId")
:删除productCache
中键为productId
的缓存项。deleteProductFromDatabase(productId)
:从数据库删除产品信息。- 效果:执行
deleteProductById
时,Redis 中缓存的产品数据会自动清除,避免用户访问到已删除的数据。
批量删除缓存中的所有项
在某些场景中(如大规模数据更新或系统重启),我们可能需要清空某个缓存区域的所有缓存项。这时可以使用 allEntries=true
实现批量清除。
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.stereotype.Service;
@Service
public class ProductService {
@CacheEvict(value = "productCache", allEntries = true)
public void clearAllProductCache() {
System.out.println("清空所有产品缓存");
// 执行批量清除缓存的逻辑
}
}
代码解释:
@CacheEvict(value = "productCache", allEntries = true)
:清空productCache
缓存区域中的所有缓存项。- 效果:执行
clearAllProductCache
方法时,productCache
中的所有缓存项将被清除。
3.4.2.手动清理缓存(在数据库更新后手动清理缓存)
在某些情况下,我们需要更精细地控制缓存清理逻辑。例如,可能需要在执行一系列更新操作后统一清除缓存,或者在复杂业务逻辑中根据特定条件手动清理缓存。此时可以手动调用 Redis API 来删除缓存。
1. 手动清理缓存的场景
手动清理缓存适合以下场景:
- 复杂的缓存清理逻辑:当需要清除多个缓存项或批量缓存时,手动清理可以提供更高的灵活性。
- 分布式环境:在分布式系统中,一个服务更新数据后需要通知其他服务清理缓存。
- 条件清理:当缓存清理条件较复杂时,手动清理可以实现更精准的控制。
假设在更新产品信息后,我们希望手动清理 Redis 中的缓存项,而不是使用 @CacheEvict
注解。
2.手动清理缓存的具体代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class ProductService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String CACHE_PREFIX = "productCache::";
public Product updateProduct(Product product) {
System.out.println("更新数据库中的产品信息");
// 1. 更新数据库中的产品信息
product = saveProductToDatabase(product);
// 2. 手动清理缓存
String cacheKey = CACHE_PREFIX + product.getId();
redisTemplate.delete(cacheKey);
System.out.println("手动清理缓存项: " + cacheKey);
return product;
}
private Product saveProductToDatabase(Product product) {
// 模拟数据库保存逻辑
return product;
}
}
代码解释:
saveProductToDatabase(product)
:更新数据库中的产品信息。redisTemplate.delete(cacheKey)
:手动删除 Redis 中对应的缓存项。- 效果:在
updateProduct
方法中手动清理缓存,确保缓存中的数据是最新的。可以根据业务需求灵活控制缓存的清理时机。
3.批量清理缓存示例
如果需要清理多个缓存项或指定前缀的缓存,可以使用 RedisTemplate
提供的批量删除方法。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Set;
@Service
public class ProductService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String CACHE_PREFIX = "productCache::";
public void clearProductCacheByPrefix(String prefix) {
// 1. 找到所有匹配指定前缀的缓存键
Set<String> keys = redisTemplate.keys(CACHE_PREFIX + prefix + "*");
// 2. 批量删除缓存项
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
System.out.println("清理缓存项: " + keys);
}
}
}
代码解释:
redisTemplate.keys(CACHE_PREFIX + prefix + "*")
:查找所有符合指定前缀的缓存键,返回一个键的集合。redisTemplate.delete(keys)
:批量删除这些缓存项,确保缓存中不会有过时的数据。- 效果:可以灵活地根据前缀清理多个缓存项,例如清理所有以
"productCache::category:"
开头的缓存项。
4. 分布式锁的实现
- Redis 的 SETNX 和 EXPIRE 实现:通过手动使用
SETNX
和EXPIRE
实现分布式锁,但在高并发场景中安全性不如 Redisson。 - Redisson 的 RLock 实现:Redisson 提供了简化的
RLock
API,用于安全可靠的分布式锁管理,适合复杂的业务需求,自动续期和异常安全,建议使用 Redisson 实现分布式锁。
Redis 在缓存和服务层的业务逻辑中都能发挥作用,适合那些读写频繁、实时性高的场景。在缓存中,它加速数据访问;在服务层,它为业务逻辑中的高并发控制、分布式锁、计数和限流等需求提供解决方案。
6.1.使用 SETNX
和 EXPIRE
命令手动实现分布式锁
1. 分布式锁的原理
Redis 的 SETNX
和 EXPIRE
命令可以实现基础的分布式锁功能:
- SETNX(SET if Not Exists):尝试设置一个键(代表锁),如果该键不存在,则设置成功,表示锁被成功获取;如果键已存在,表示锁被占用。
- EXPIRE:为锁设置过期时间,确保锁在持有方崩溃或出错时能自动释放,避免死锁。
实现步骤如下:
- 获取锁:使用
SETNX
设置一个锁键。如果返回成功,表示当前客户端获取到锁;否则获取锁失败。 - 设置过期时间:在获取锁成功后,使用
EXPIRE
为锁设置一个过期时间,防止锁因意外情况无法释放。 - 释放锁:在操作完成后,客户端主动删除锁,释放资源。
2. 在 Spring Boot 中实现 Redis 分布式锁
我们可以通过 StringRedisTemplate
来操作 Redis,创建一个分布式锁服务类 RedisLockService
,用于管理锁的获取和释放。
代码示例
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class RedisLockService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String LOCK_KEY = "lock_key"; // 锁的键名称
private static final int LOCK_EXPIRE = 10; // 锁的过期时间,10秒
// 尝试获取锁
public boolean tryLock() {
// 使用 SETNX 命令尝试获取锁
Boolean success = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, "1");
if (Boolean.TRUE.equals(success)) {
// 获取锁成功后,使用 EXPIRE 设置锁的过期时间
redisTemplate.expire(LOCK_KEY, LOCK_EXPIRE, TimeUnit.SECONDS);
return true; // 返回 true 表示成功获取到锁
}
return false; // 获取锁失败,返回 false
}
// 释放锁
public void unlock() {
redisTemplate.delete(LOCK_KEY); // 删除锁键,释放锁资源
}
}
代码详细解释
-
tryLock()
方法:用于尝试获取锁setIfAbsent(LOCK_KEY, "1")
:使用SETNX
命令尝试设置锁。如果成功返回true
,表示当前客户端获取到锁。如果返回false
,表示锁已被占用。expire(LOCK_KEY, LOCK_EXPIRE, TimeUnit.SECONDS)
:设置锁的过期时间为 10 秒,防止锁在客户端意外退出时长期占用。
-
unlock()
方法:用于释放锁delete(LOCK_KEY)
:删除锁键,释放锁资源。确保操作完成后锁被释放,让其他客户端可以继续获取该锁。
3. 使用示例:库存扣减操作
假设我们有一个商品库存扣减的场景,为了防止多个请求同时扣减库存导致超卖问题,可以通过分布式锁来确保同一时间只有一个请求可以扣减库存。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class InventoryService {
@Autowired
private RedisLockService redisLockService;
public String deductInventory(String productId) {
// 尝试获取锁
if (redisLockService.tryLock()) {
try {
// 执行扣减库存的逻辑
boolean success = reduceStock(productId);
return success ? "扣减成功" : "库存不足";
} finally {
// 确保操作完成后释放锁
redisLockService.unlock();
}
} else {
return "请稍后再试"; // 如果获取锁失败,提示用户稍后重试
}
}
private boolean reduceStock(String productId) {
System.out.println("扣减库存,产品ID: " + productId);
return true; // 假设库存扣减成功
}
}
使用示例的代码解释
tryLock()
:调用tryLock()
方法尝试获取锁,如果成功,则执行扣减库存操作。- 业务逻辑:调用
reduceStock(productId)
方法执行库存扣减操作,确保在持有锁的情况下完成操作。 - 释放锁:操作完成后无论成功与否,调用
unlock()
方法释放锁。
4. 锁的有效期与安全性
由于 SETNX
和 EXPIRE
不是原子操作,存在并发情况下锁的过期时间可能未设置的问题。虽然可以通过 Lua 脚本来实现原子性,但这种实现较为复杂。为确保分布式锁的可靠性和安全性,通常建议在高并发场景中使用 Redisson 来简化操作和提升安全性。
6.2.使用Redisson的RLock实现分布式锁
Redisson 是一个 Redis 客户端,封装了 Redis 的锁机制,并提供了 RLock
接口来管理分布式锁。相比使用 SETNX
和 EXPIRE
组合手动管理锁的方式,Redisson 的分布式锁具备以下优势:
- 自动续期:Redisson 的
RLock
支持锁的自动续期,确保在长时间持有锁时不意外失效。 - 可重入性:同一线程可以多次获取同一锁,不会造成死锁。
- 高可靠性:Redisson 提供了丰富的 API,可以灵活管理锁的超时和等待时间,适合高并发场景。
- Redisson 的自动续期机制设计的核心是确保锁在“正常持有”时不会意外释放,但一旦持有锁的线程中断或崩溃,续期停止,锁在过期时间到达后自动释放。这样既保证了锁在正常情况下的稳定性,也确保了线程异常时的自动释放。
- 在业务逻辑中,可能会出现递归调用或嵌套调用的情况,即一个持有锁的方法在调用的过程中又尝试获取同一锁。支持可重入性后,同一线程在持有锁的情况下可以再次获取锁,不会发生阻塞或死锁。
- 在分布式系统中,业务逻辑通常分为多个层次,每一层可能都有自己的锁需求。可重入性使得在多个层次调用同一锁时不需要担心冲突,同一线程可以在各个层次持有同一把锁。
6.2.1.使用步骤
1. 引入 Redisson 依赖
首先,在 Spring Boot 项目中添加 Redisson 的依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.15.3</version> <!-- 请使用最新版本 -->
</dependency>
2. 配置 RedissonClient Bean
在 application.properties
文件中配置 Redis 的连接信息:
spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=yourpassword # 如果没有设置密码,可以省略这一行
spring.redis.timeout=3000 # 连接超时时间(毫秒)
接下来,创建一个 RedissonConfig
配置类,将 RedissonClient
配置为一个 Spring Bean。通过这个 Bean,可以在项目中方便地获取 RLock
对象,实现分布式锁。
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
return Redisson.create(config);
}
}
配置解释:
Config config = new Config();
:创建 Redisson 的配置对象。config.useSingleServer().setAddress("redis://localhost:6379");
:设置 Redis 服务器的地址。return Redisson.create(config);
:通过配置创建RedissonClient
实例,并注册为 Spring Bean。
3. 获取 RLock 对象
在业务代码中,可以通过 RedissonClient
获取 RLock
对象。 RLock
是一个分布式锁接口,提供了丰富的锁管理方法,支持可重入和自动续期功能。
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class LockService {
@Autowired
private RedissonClient redissonClient;
public void lockExample() {
RLock lock = redissonClient.getLock("my_lock");
// 使用 lock 进行分布式锁操作
}
}
6.2.2. RLock 的常用方法和使用示例
RLock 提供了多种方法来管理分布式锁的获取和释放,以下是常用方法及其使用示例:
(1)lock()
:阻塞获取锁
该方法会阻塞直到获取到锁,适合在没有超时等待要求的场景。
public void lockExample() {
RLock lock = redissonClient.getLock("my_lock");
lock.lock(); // 阻塞获取锁
try {
// 执行同步的业务逻辑
} finally {
lock.unlock(); // 释放锁
}
}
解释:
lock.lock()
:阻塞方式获取锁,如果锁已经被其他线程持有,则等待直至锁被释放。lock.unlock()
:释放锁,让其他线程可以获取该锁。
(2)tryLock()
:非阻塞获取锁
tryLock()
方法用于尝试获取锁。如果锁已经被持有,则立即返回 false
,不阻塞等待。
public boolean tryLockExample() {
RLock lock = redissonClient.getLock("my_lock");
boolean locked = lock.tryLock(); // 非阻塞获取锁
if (locked) {
try {
// 执行同步的业务逻辑
return true;
} finally {
lock.unlock(); // 释放锁
}
} else {
System.out.println("获取锁失败,锁已被持有");
return false;
}
}
解释:
tryLock()
:立即尝试获取锁,如果锁被持有则返回false
,表示获取锁失败;如果锁可用则返回true
,表示获取锁成功。
(3)tryLock(long waitTime, long leaseTime, TimeUnit unit)
:带超时的获取锁
这个方法支持等待和超时设置,可以在锁被其他线程持有时等待一定时间。如果在等待时间内未获取到锁则放弃请求,同时设置锁的持有时间。
- 等待时间:当前线程等待的最大时间。
- 持有时间:锁成功获取后自动释放的时间。
public boolean tryLockWithTimeoutExample() {
RLock lock = redissonClient.getLock("my_lock");
try {
// 等待时间为5秒,锁的持有时间为10秒
if (lock.tryLock(5, 10, TimeUnit.SECONDS)) {
try {
// 执行同步的业务逻辑
return true;
} finally {
lock.unlock(); // 释放锁
}
} else {
System.out.println("获取锁超时,未能成功获取锁");
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("获取锁时发生异常");
return false;
}
}
解释:
tryLock(5, 10, TimeUnit.SECONDS)
:尝试获取锁,等待最多 5 秒,如果在 5 秒内获取到锁,持有 10 秒后自动释放。
实际应用示例:在库存扣减中使用 RLock
假设一个电商系统中有一个库存扣减的操作,为了防止超卖,需要确保每次只有一个线程能够扣减库存,可以通过 RLock
实现这一功能。
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class InventoryService {
@Autowired
private RedissonClient redissonClient;
private static final String LOCK_KEY = "inventory_lock"; // 锁的键名称
public String deductInventory(String productId) {
RLock lock = redissonClient.getLock(LOCK_KEY);
try {
// 尝试加锁,等待时间5秒,锁定时间10秒
if (lock.tryLock(5, 10, TimeUnit.SECONDS)) {
try {
// 执行扣减库存的逻辑
boolean success = reduceStock(productId);
return success ? "扣减成功" : "库存不足";
} finally {
lock.unlock(); // 释放锁
}
} else {
return "请稍后再试"; // 获取锁失败
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "获取锁失败";
}
}
private boolean reduceStock(String productId) {
System.out.println("扣减库存,产品ID: " + productId);
return true; // 假设库存扣减成功
}
}
代码解释:
redissonClient.getLock(LOCK_KEY)
:获取一个分布式锁对象RLock
,锁的名称为"inventory_lock"
。tryLock(5, 10, TimeUnit.SECONDS)
:尝试获取锁,等待时间 5 秒,持有时间 10 秒。- 释放锁:通过
lock.unlock()
确保在业务逻辑执行完成后释放锁。
6.3. 加锁后为什么缓存数据仍可能被其他线程修改
这是因为Redis 分布式锁和缓存并没有直接联系,它们是两个独立的系统。锁的机制只是确保在同一时间只有一个线程可以执行某段代码,并不保证锁住的内容不会被其他线程修改。
在分布式锁场景中,缓存和锁是分离的:
- 锁控制代码逻辑的独占访问:通过
lock.tryLock()
,确保只有一个线程可以访问数据库和缓存更新逻辑(即从数据库获取数据并更新缓存)。 - 缓存并没有被锁住:在 Redis 中,缓存键的读写是独立的,任何线程都可以访问缓存并进行修改,锁并不能限制其他线程对缓存的访问。
5. 缓存穿透、击穿与雪崩的防范
5.1. 缓存穿透
5.1.1什么是缓存穿透
缓存穿透指的是请求的数据在缓存和数据库中都不存在,这种请求直接穿过缓存访问数据库。当恶意请求大量涌入时,缓存层无法拦截这些无效请求,导致数据库承受大量压力,从而影响性能。比如,用户可能不断请求一个不存在的商品 ID,Redis 缓存中不存在此数据,数据库中也没有。当此类请求频繁出现时,会绕过缓存层直接请求数据库,造成缓存穿透。
5.1.2.什么是布隆过滤器
布隆过滤器(Bloom Filter)是一种概率性数据结构,用于判断一个元素是否存在于集合中。它可以在较少的内存占用下实现快速判断,并且在大多数情况下准确可靠。布隆过滤器能够有效防止缓存穿透,拦截无效请求,从而减少数据库访问压力。
布隆过滤器的工作原理
布隆过滤器由一个长度为 m
的位数组(bit array)和 k
个不同的哈希函数组成。布隆过滤器的基本操作步骤如下:
-
插入元素:当插入一个元素(如 key)时,布隆过滤器会对该元素通过
k
个哈希函数分别计算出k
个哈希值,然后将位数组中对应的k
个位置设为1
。 -
查询元素:当需要判断一个元素是否存在时,对该元素进行相同的
k
个哈希计算,检查位数组中对应的k
个位置。如果所有位置的值都是1
,则表示该元素“可能存在”;如果有任意一个位置的值为0
,则可以确定该元素不存在。
布隆过滤器的误判问题
由于布隆过滤器的概率性,如果查询结果是“存在”,并不能完全保证该数据确实存在(存在一定的误判概率);但如果查询结果为“不存在”,则可以确定该数据绝对不存在。因此,布隆过滤器非常适合用于防止缓存穿透,因为它能高效筛除不存在的数据请求。
布隆过滤器的优势
-
拦截无效请求:布隆过滤器可以有效拦截不存在的数据请求,将大量无效请求挡在缓存层之外,避免对数据库的冲击。
-
高效、低成本:布隆过滤器占用的空间非常小,通过位数组和哈希函数实现,查询速度快且内存占用少,适合大规模数据场景。
-
减少数据库负担:通过提前过滤掉不存在的数据请求,可以显著降低数据库的负载,提升系统性能。
5.1.3.配置布隆过滤器
Spring Boot 中可以使用 Redisson 提供的布隆过滤器来实现防止缓存穿透。Redisson 是一个强大的 Redis 客户端,支持布隆过滤器等高级功能。我们将以下列代码来演示如何使用布隆过滤器。
代码示例
首先,确保项目中已经添加了 Redisson 依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.15.3</version> <!-- 使用最新版本 -->
</dependency>
然后,我们实现一个布隆过滤器服务 BloomFilterService
。在该服务中初始化布隆过滤器,将所有合法的 key 添加进去,并提供判断方法来检测 key 是否存在。
布隆过滤器服务类
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class BloomFilterService {
private static final String BLOOM_FILTER_NAME = "productBloomFilter";
@Autowired
private RedissonClient redissonClient;
// 初始化布隆过滤器
public void initBloomFilter() {
// 获取 Redis 布隆过滤器对象
RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter(BLOOM_FILTER_NAME);
// 初始化布隆过滤器的大小和误判率
bloomFilter.tryInit(1000000L, 0.01); // 设置预期插入量为100万,误判率为1%
// 将所有合法的 key 添加到布隆过滤器中
bloomFilter.add("product_12345");
bloomFilter.add("product_67890");
// 可以通过批量读取数据库中的所有商品 ID,将它们批量加入布隆过滤器
}
// 判断 key 是否在布隆过滤器中
public boolean existsInBloomFilter(String key) {
RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter(BLOOM_FILTER_NAME);
return bloomFilter.contains(key); // 判断 key 是否可能存在于布隆过滤器中
}
}
代码详细解释
-
初始化布隆过滤器:
- 使用
redissonClient.getBloomFilter(BLOOM_FILTER_NAME)
获取布隆过滤器实例。 - 调用
tryInit
方法设置布隆过滤器的大小(即预期的插入量)和误判率。例如设置为 100 万个 key,误判率为 1%。 - 误判率越低,布隆过滤器的空间开销越大。因此误判率的选择要在内存占用和准确性之间找到平衡。
- 使用
-
加入合法的 key:
- 通过
bloomFilter.add
将所有合法的 key(如数据库中所有商品 ID)加入布隆过滤器。可以从数据库中批量读取这些 key,然后依次加入布隆过滤器。
- 通过
-
判断 key 是否存在:
- 在处理请求时,首先调用
existsInBloomFilter
方法检查 key 是否在布隆过滤器中。 - 如果布隆过滤器返回
false
,表示该 key 不存在,不需要再访问数据库和缓存,直接返回空结果。 - 如果返回
true
,表示该 key 可能存在,继续访问缓存或数据库获取数据。
- 在处理请求时,首先调用
5.1.4.使用布隆过滤器示例
在缓存穿透的防范中,我们可以通过布隆过滤器提前筛除无效请求:
- 在系统启动时,将所有有效的 key(如数据库中已有商品的 ID)存入布隆过滤器。
- 在每次查询之前,先判断 key 是否存在于布隆过滤器中:
- 如果布隆过滤器判断为不存在,则直接返回空结果,不去查询数据库。
- 如果布隆过滤器判断为存在,则继续检查缓存,如果缓存未命中再去访问数据库
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class ProductService {
private static final String CACHE_KEY_PREFIX = "product_cache_";
@Autowired
private BloomFilterService bloomFilterService;
@Autowired
private StringRedisTemplate redisTemplate;
public String getProduct(String productId) {
// 1. 使用布隆过滤器判断是否可能存在该商品
if (!bloomFilterService.existsInBloomFilter("product_" + productId)) {
return "商品不存在"; // 布隆过滤器判断不存在,直接返回
}
// 2. 查询缓存
String cacheKey = CACHE_KEY_PREFIX + productId;
String cachedProduct = redisTemplate.opsForValue().get(cacheKey);
if (cachedProduct != null) {
return cachedProduct; // 缓存命中,返回缓存数据
}
// 3. 查询数据库(缓存未命中,且布隆过滤器判断可能存在)
String dbProduct = queryDatabase(productId);
if (dbProduct != null) {
// 将查询结果加入缓存,避免下次穿透
redisTemplate.opsForValue().set(cacheKey, dbProduct);
}
return dbProduct != null ? dbProduct : "商品不存在";
}
private String queryDatabase(String productId) {
// 模拟数据库查询
System.out.println("查询数据库,商品ID:" + productId);
// 假设数据库中存在商品 "product_12345"
if ("12345".equals(productId)) {
return "Product data for " + productId;
}
return null;
}
}
代码解释
-
布隆过滤器判断:在查询商品之前,首先调用
bloomFilterService.existsInBloomFilter
方法判断该商品 ID 是否可能存在。如果布隆过滤器返回false
,说明数据库中也不存在该数据,可以直接返回“该商品不存在”。 -
缓存查询:如果布隆过滤器判断数据可能存在,则继续查询 Redis 缓存。
-
数据库查询:如果缓存未命中(即 Redis 中不存在该商品的缓存数据),则查询数据库获取数据,并将数据缓存到 Redis 中,避免下次请求时再次访问数据库。
5.2.缓存击穿
5.2.1什么是缓存击穿?
缓存击穿指的是缓存中某个热点数据失效,导致大量请求直接访问数据库的情况。通常在高并发的场景下,一个热点数据被大量访问,突然失效后所有请求同时涌入数据库,增加数据库负担,可能导致性能瓶颈或崩溃。
典型场景
假设一个电商平台有一个非常热门的商品,所有用户都在查询该商品信息。当该商品的缓存突然过期后,大量请求会直接查询数据库,给数据库带来巨大压力。
缓存击穿的防范措施
-
加锁机制:在缓存失效时,通过加锁确保只有一个线程可以查询数据库并重建缓存,其他线程等待锁释放后再读取缓存。此方法适用于并发访问较高的热点数据。
-
热点数据永不过期:对极少数热点数据设置永不过期,手动更新这些数据的缓存,确保高频数据不会因缓存过期而直接冲击数据库。此方法适用于访问频繁且数据更新较少的场景。
5.2.2. 使用加锁机制解决缓存击穿
在缓存失效时,可以通过分布式锁确保只有一个线程访问数据库并更新缓存,其他线程等待锁释放后再读取缓存。这样可以避免大量并发请求直接冲击数据库。
实现原理
- 查询缓存:先查询缓存,如果缓存命中则直接返回结果。
- 加锁:如果缓存未命中,尝试获取分布式锁,确保只有一个线程可以访问数据库。
- 双重检查缓存:在获取锁后再次检查缓存,防止在获取锁期间缓存已被其他线程更新。
- 查询数据库并更新缓存:如果缓存仍然未命中,查询数据库并将数据写入缓存。
- 释放锁:数据库查询和缓存更新完成后,释放锁,让其他线程可以直接读取缓存。
这里为什么加锁之后还能让其他线程修改缓存:因为加锁的意思是对这段代码进行加锁,这段代码的操作只有被加锁的线程能够进行,但缓存是任何线程都可以修改,加锁只是对这段操作加锁,并不是对缓存加锁。
实现代码示例
以下是一个在 Spring Boot 中使用 Redisson
的加锁机制实现缓存击穿的防范示例:
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class CacheService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
private static final String LOCK_KEY = "product_lock_";
private static final String CACHE_KEY = "product_cache_";
public String getProduct(String productId) {
// 1. 查询缓存
String cacheValue = redisTemplate.opsForValue().get(CACHE_KEY + productId);
if (cacheValue != null) {
return cacheValue; // 缓存命中,直接返回
}
// 2. 缓存未命中,尝试加锁
RLock lock = redissonClient.getLock(LOCK_KEY + productId);
try {
if (lock.tryLock(5, 10, TimeUnit.SECONDS)) { // 获取锁,等待5秒,锁定10秒
// 3. 双重检查缓存是否已经被其他线程更新
cacheValue = redisTemplate.opsForValue().get(CACHE_KEY + productId);
if (cacheValue != null) {
return cacheValue; // 如果缓存已经被其他线程更新,直接返回
}
// 4. 查询数据库
String dbValue = queryDatabase(productId);
// 5. 更新缓存,设置过期时间
redisTemplate.opsForValue().set(CACHE_KEY + productId, dbValue, 10, TimeUnit.MINUTES);
return dbValue;
} else {
// 获取锁失败,等待缓存更新完成后重新获取
Thread.sleep(100); // 可以增加重试机制
return redisTemplate.opsForValue().get(CACHE_KEY + productId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
lock.unlock(); // 6. 释放锁
}
}
private String queryDatabase(String productId) {
// 模拟数据库查询
System.out.println("查询数据库,商品ID:" + productId);
return "Product data for " + productId;
}
}
代码详细解释
-
查询缓存:首先从缓存中读取数据。如果缓存命中则直接返回结果,避免了不必要的锁操作。
-
加锁查询数据库:
- 如果缓存未命中,使用
Redisson
获取分布式锁lock
,确保只有一个线程可以查询数据库并更新缓存。 - 双重检查缓存:在成功加锁后再次检查缓存,防止在等待锁的过程中其他线程已经更新了缓存。
- 如果缓存未命中,使用
-
重建缓存:持有锁的线程从数据库中读取数据并更新到缓存中,同时设置缓存的过期时间。
-
释放锁:缓存更新完成后,释放锁,确保其他线程可以从缓存读取最新数据。
优势
- 防止缓存击穿:加锁确保在缓存失效时,只有一个线程查询数据库并更新缓存,避免大量并发请求直接冲击数据库。
- 双重检查减少重复操作:锁内再次检查缓存,确保其他线程不会重复查询数据库,减少数据库压力。
5.2.3. 设置热点数据永不过期
对于少数非常高频访问的热点数据,可以将其缓存设置为永不过期,这样确保数据不会失效,从而避免缓存击穿。
实现原理
- 热点数据初始化:系统启动时将热点数据加载到缓存中,设置为永不过期。
- 数据更新策略:通过数据库更新事件或定时任务主动更新热点缓存数据,而不依赖过期机制。
这种方式适合稳定且访问频繁的数据,例如电商首页的推荐商品、分类信息等,可以有效减少缓存失效的概率。
代码示例
以下示例展示如何在 Spring Boot 中将一些热点数据设置为永不过期,并通过手动更新的方式维护缓存数据。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class HotDataCacheService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String CACHE_KEY = "hot_product_cache_";
public String getHotProduct(String productId) {
// 查询缓存
String cacheValue = redisTemplate.opsForValue().get(CACHE_KEY + productId);
if (cacheValue != null) {
return cacheValue; // 缓存命中,直接返回
}
// 缓存未命中,加载数据库并更新缓存(永不过期)
String dbValue = queryDatabase(productId);
// 设置热点数据永不过期
redisTemplate.opsForValue().set(CACHE_KEY + productId, dbValue); // 不设置过期时间
return dbValue;
}
private String queryDatabase(String productId) {
// 模拟数据库查询
System.out.println("查询数据库,商品ID:" + productId);
return "Hot product data for " + productId;
}
// 手动更新缓存
public void updateHotProductCache(String productId, String productData) {
redisTemplate.opsForValue().set(CACHE_KEY + productId, productData); // 不设置过期时间
}
}
代码详细解释
-
查询缓存:首先尝试读取缓存数据。如果缓存命中则直接返回结果,减少数据库访问。
-
更新缓存:如果缓存未命中,查询数据库并更新缓存,不设置过期时间,确保该热点数据不会因过期而失效。
-
手动更新缓存:提供
updateHotProductCache
方法,以便在热点数据有变动时手动更新缓存内容。可以通过数据库更新事件或定时任务主动更新缓存。
优势和适用场景
- 适合高频访问的热点数据:特别是访问频率很高、不会频繁变化的数据,例如首页推荐商品、热门分类等。
- 避免缓存失效带来的冲击:热点数据不会因为缓存失效而直接查询数据库,有效减少数据库负载。
5.3.缓存雪崩
5.3.1.什么是缓存雪崩?
缓存雪崩指的是在某个时间点,大量缓存同时失效,导致所有请求直接访问数据库,给数据库带来巨大的压力,甚至可能导致系统崩溃。这种情况通常发生在以下场景:
- 缓存服务器宕机:整个缓存服务不可用,所有请求直接落到数据库。
- 大量缓存集中在同一时间过期:大量缓存设置了相同的过期时间,导致在某一时刻同时失效。
为防止缓存雪崩,可以采取以下措施:
-
缓存数据的过期时间设置随机化:在缓存过期时间的基础上,加上一个随机值,避免大量缓存同时失效。
-
缓存预热:在系统启动或高峰期到来之前,提前将热点数据加载到缓存中。
-
多级缓存:在本地增加一级缓存,如 Guava Cache,减轻对远程缓存的依赖。
-
限流和降级:在缓存失效时,对数据库的访问进行限流,必要时进行服务降级。
5.3.2. 设置不同的缓存失效时间
原理
为每个缓存数据设置一个基础过期时间,并在此基础上添加一个随机的时间偏移量,使缓存的过期时间分布在一定的范围内。这样可以避免大量缓存数据在同一时刻失效,分散过期时间点,从而减轻数据库的瞬时访问压力。
实现代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@Service
public class CacheService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String CACHE_KEY_PREFIX = "product_cache_";
private static final int BASE_EXPIRE_TIME = 10; // 基础过期时间,单位:分钟
public void cacheProduct(String productId, String productData) {
// 生成随机过期时间,范围在 BASE_EXPIRE_TIME 到 BASE_EXPIRE_TIME + 5 分钟之间
int expireTime = BASE_EXPIRE_TIME + new Random().nextInt(5);
// 将数据存入 Redis,并设置不同的过期时间
redisTemplate.opsForValue().set(CACHE_KEY_PREFIX + productId, productData, expireTime, TimeUnit.MINUTES);
}
public String getProduct(String productId) {
// 从 Redis 中获取数据
return redisTemplate.opsForValue().get(CACHE_KEY_PREFIX + productId);
}
}
代码解释
-
基础过期时间:
BASE_EXPIRE_TIME
设置为 10 分钟,表示缓存的基础过期时间。 -
随机过期时间:在
BASE_EXPIRE_TIME
基础上,加上一个 0 到 4 分钟的随机数new Random().nextInt(5)
,使得缓存的失效时间在 10 到 14 分钟之间。这样不同的缓存条目会有略微不同的过期时间,避免在同一时刻集中失效。 -
缓存数据设置:
redisTemplate.opsForValue().set
方法将数据写入 Redis,并指定过期时间expireTime
。不同的缓存条目会有不同的失效时间,有效降低缓存雪崩的风险。
5.3.3. 缓存预热
缓存预热是指在系统启动时,提前将部分热点数据加载到缓存中,避免在系统运行后产生大量的缓存未命中情况。通过缓存预热,可以保证在系统启动或高峰期来临时,热点数据已经存在于缓存中,减少对数据库的访问压力。
实现思路
- 在系统启动时:加载一些常用的、访问频率高的数据到缓存中。
- 定时任务:定期更新缓存中的热点数据,确保缓存中的数据始终有效。
实现代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;
@Service
public class CacheWarmUpService {
@Autowired
private CacheService cacheService;
@PostConstruct
public void warmUpCache() {
// 获取热点数据的ID列表
List<String> hotProductIds = getHotProductIds();
// 将每个热点数据加载到缓存中
for (String productId : hotProductIds) {
String productData = queryDatabase(productId);
cacheService.cacheProduct(productId, productData); // 将数据预先缓存
}
}
private List<String> getHotProductIds() {
// 从数据库或配置中获取热点数据ID列表
return Arrays.asList("1001", "1002", "1003"); // 示例数据
}
private String queryDatabase(String productId) {
// 模拟数据库查询
return "Product data for " + productId;
}
}
代码解释
-
@PostConstruct
注解:该注解会在 Spring Boot 容器启动后自动调用warmUpCache()
方法。这可以确保在系统启动时,热点数据就已经加载到缓存中。 - 获取热点数据:
getHotProductIds
方法用于获取高访问量的商品 ID 列表,可以从数据库或配置文件中获取。 - 缓存热点数据:遍历热点商品 ID,通过
queryDatabase
方法查询数据库,然后调用cacheService.cacheProduct
方法将数据缓存起来。
5.3.4. 其他防范方法
1.限流与降级
在缓存雪崩发生时,即大量缓存突然失效,访问量瞬间增加,可以通过限流和降级的方式来保护数据库,避免数据库被大量请求压垮。
限流
限流可以控制进入数据库的请求量,将超出部分的请求暂时阻止或延迟,避免瞬时高并发对数据库造成过大的压力。常用的限流算法包括令牌桶、漏桶算法等。
降级
当缓存不可用时,可以考虑提供简化的数据或延迟响应,避免数据库压力过大。例如:
- 返回缓存中的旧数据,虽然不够实时,但可以减少数据库的访问。
- 对非核心业务进行降级,暂时返回空结果或简单提示,确保核心业务的正常运行。
2.多级缓存
多级缓存通过本地缓存和远程缓存结合使用的方式,进一步减轻远程缓存的压力,提升系统的访问速度。多级缓存常见的实现方式是本地缓存(如 Caffeine 或 Guava Cache) + Redis 远程缓存:
- 本地缓存:将热点数据存储在应用服务器本地的内存中,访问速度极快,可以应对短期的缓存雪崩。
- 远程缓存:使用 Redis 作为分布式缓存,缓存更多数据。
6.Redis 高可用架构方案
在 Redis 的高可用技术方案中,高可用指的是在出现节点故障或网络波动时,系统能够持续提供缓存服务,避免服务中断或性能严重下降。高可用的设计保证了即使某个缓存节点故障,缓存服务仍然可以通过故障转移等机制正常工作,减少了单点故障的风险,保证了系统的可靠性和稳定性。
6.1. Redis Sentinel(主从复制和故障监控)
Redis Sentinel 是 Redis 官方提供的一种高可用解决方案,主要通过主从复制和故障监控来实现。在 Redis Sentinel 架构中,Sentinel 负责监控 Redis 集群中的主从节点,当主节点出现故障时,Sentinel 会自动将一个从节点提升为主节点,保证服务的持续运行。这种架构适用于不需要数据分片的场景,适合数据量较小且对高可用性有要求的系统。
6.1.1.Redis Sentinel 高可用架构原理
Redis Sentinel 通过三个主要功能来实现 Redis 集群的高可用性:
-
主从复制:在 Sentinel 架构中,Redis 主节点负责处理写请求,并将数据同步到从节点。多个从节点是主节点的备份,只负责同步数据,不参与写操作。当主节点故障时,Sentinel 可以将从节点提升为主节点,从而实现高可用。
-
故障转移:每个 Sentinel 实例不断发送
PING
命令来检测 Redis 主节点和其他 Sentinel 的状态。如果在设定的时间范围内没有收到主节点的响应,Sentinel 会认为主节点故障,发起故障转移(Failover),选举某个从节点为新主节点。 -
通知应用程序:当主节点发生变化时,Sentinel 会将新主节点的地址信息推送给 Redis 客户端(应用程序),让客户端感知主节点的变化,避免因主节点切换而导致的连接错误。
6.1.2.在 Spring Boot 中集成 Redis Sentinel
在 Spring Boot 项目中集成 Redis Sentinel 包括以下几个主要步骤:
- 配置 Redis Sentinel 集群:配置 Redis 主从节点并启动多个 Sentinel 实例监控主节点的状态。
- 在 Spring Boot 中配置 Sentinel 集群:在
application.properties
中指定 Sentinel 集群的节点信息和主节点名称。Spring Boot 会自动管理主节点切换。 - 应用程序使用:在 Spring Boot 中直接使用
RedisTemplate
或StringRedisTemplate
进行 Redis 操作,Sentinel 会自动切换主节点,无需手动干预。
1.搭建并配置 Redis Sentinel 集群
配置 Redis 主从复制
首先需要配置 Redis 的主从复制,使得数据可以从主节点同步到从节点。
-
主节点配置:假设 Redis 主节点运行在
127.0.0.1:6379
,无需额外配置。 -
从节点配置:在 Redis 从节点的配置文件(如
redis-slave.conf
)中,添加如下配置,将该节点设置为主节点的从节点。replicaof 127.0.0.1 6379
这样,从节点会自动从主节点复制数据,保证数据一致性。
配置并启动 Redis Sentinel
接着需要在 Redis 服务器上配置 Redis Sentinel。创建或编辑 sentinel.conf
文件,添加以下配置来监控 Redis 主节点。
# 监控的主节点名称和地址
sentinel monitor mymaster 127.0.0.1 6379 2
# 在 5 秒内未响应则判定节点不可达
sentinel down-after-milliseconds mymaster 5000
# 故障转移最大超时 10 秒
sentinel failover-timeout mymaster 10000
# 故障转移时同步新主节点的从节点数量
sentinel parallel-syncs mymaster 1
sentinel monitor
:mymaster
是主节点名称,127.0.0.1:6379
是主节点地址,2
表示至少两个 Sentinel 节点判定主节点不可达时才进行故障转移。sentinel down-after-milliseconds
:指定 Sentinel 判断主节点失效的超时时间(5 秒)。sentinel failover-timeout
:设置故障转移的最大等待时间(10 秒)。sentinel parallel-syncs
:指定故障转移完成后,允许多个从节点并行同步新主节点。
启动 Redis Sentinel 服务
在每台需要运行 Sentinel 的服务器上启动 Sentinel 服务。一般至少配置三个 Sentinel 节点,以保证高可用性和故障切换的准确性。
redis-server /path/to/sentinel.conf --sentinel
2.在 Spring Boot 中配置 Redis Sentinel 集成
在 Spring Boot 项目中,直接通过配置文件指定 Redis Sentinel 信息,Spring Boot 会自动识别并连接到 Redis Sentinel 集群。
配置 application.properties
在 application.properties
中配置 Redis Sentinel 的主节点名称和 Sentinel 节点地址,Spring Boot 将会自动管理主节点切换,无需手动更改主节点地址。
# Redis Sentinel 集群的主节点名称和 Sentinel 节点地址
spring.redis.sentinel.master=mymaster
spring.redis.sentinel.nodes=127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381
spring.redis.password=yourpassword
spring.redis.sentinel.master
:指定 Redis 主节点名称,必须与sentinel.conf
中的名称一致。spring.redis.sentinel.nodes
:配置所有 Sentinel 节点的 IP 和端口,Spring Boot 会自动连接并监控这些 Sentinel 节点。spring.redis.password
:如果 Redis 设置了密码,可以在此处配置。
3.在 Spring Boot 中使用 Redis Sentinel
完成以上配置后,Spring Boot 会自动连接 Redis Sentinel 集群。当主节点发生故障时,Redis Sentinel 会自动将某个从节点提升为新的主节点,Spring Boot 无需手动干预,客户端会自动连接到新的主节点。
示例代码
在 Spring Boot 中,可以使用 StringRedisTemplate
或 RedisTemplate
来操作 Redis 数据。以下是一个简单的 Redis 服务类,展示了如何在应用中使用 Redis Sentinel:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class RedisService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void saveData(String key, String value) {
stringRedisTemplate.opsForValue().set(key, value);
}
public String getData(String key) {
return stringRedisTemplate.opsForValue().get(key);
}
}
在 RedisService
中,StringRedisTemplate
负责与 Redis Sentinel 集群交互,执行数据的读写操作。当 Redis Sentinel 发生故障转移时,Spring Boot 会自动更新主节点连接,因此无需额外的逻辑来处理主节点变化。
6.1.3.Redis Sentinel 的工作流程总结
-
故障检测:每个 Sentinel 节点不断向主节点、从节点和其他 Sentinel 节点发送
PING
命令,检测节点是否正常响应。 -
故障判定:当某个 Sentinel 节点在设定时间内未收到主节点的响应时,会将该主节点标记为主观下线状态(Subjectively Down,简称
SDOWN
)。当多个 Sentinel 节点认为主节点不可达时,主节点会被标记为客观下线(Objectively Down,简称ODOWN
),并触发故障转移。 -
故障转移:在主节点被判定故障后,Sentinel 集群会选举出一个健康的从节点,将其提升为新的主节点,并重新配置其他从节点连接到该新主节点。
-
通知客户端:故障转移完成后,Sentinel 将新主节点的信息通知给 Redis 客户端,使客户端应用自动切换到新的主节点。
6.1.4.Redis Sentinel 的特点
优势:
- 高可用性:当主节点故障时,Redis Sentinel 可以自动选择从节点进行故障转移,保证服务的持续可用。
- 自动主节点发现:应用程序无需手动调整主节点地址,Redis Sentinel 在主节点发生变化后自动通知客户端,Spring Boot 会自动连接新的主节点。
- 适合非分片场景:Redis Sentinel 不支持数据分片,因此适用于数据量相对较小、不需要水平扩展的场景。
局限性:
- 不支持分片:Redis Sentinel 只适用于单一主从结构,无法分片,数据量过大时需要 Redis Cluster 等其他分布式方案。
- 哨兵节点的高可用性依赖:为了确保故障切换的准确性和服务的稳定性,通常需要至少三个 Sentinel 节点。
6.2 Redis Cluster(数据分片和自动故障转移)
Redis Cluster 是 Redis 官方提供的一种高可用和高性能的分布式缓存方案,通过数据分片和自动故障转移来实现高可用性。Redis Cluster 将数据分片存储在多个主从节点上,支持水平扩展,适合大数据量和高并发需求的场景。在 Redis Cluster 中,每个主节点负责一部分数据,并有一个或多个从节点作为备份,当某个主节点发生故障时,从节点会自动提升为新的主节点,以确保缓存服务的正常运行。
6.2.1 Redis Cluster 高可用架构原理
Redis Cluster 通过以下几个主要功能来实现高可用性:
- 数据分片:Redis Cluster 将数据划分为 16384 个哈希槽(Hash Slots),每个节点负责一部分哈希槽,支持水平扩展。
- 自动故障转移:每个主节点拥有一个或多个从节点。当主节点出现故障时,从节点自动提升为主节点,确保数据的高可用性。
- 高并发支持:由于 Redis Cluster 分散数据存储,可以处理大规模数据请求,适合高并发场景。
6.2.2 在 Spring Boot 中集成 Redis Cluster
在 Spring Boot 项目中集成 Redis Cluster 包括以下几个主要步骤:
- 配置 Redis Cluster 集群:配置并启动多个 Redis 主从节点,创建 Redis Cluster 集群。
- 在 Spring Boot 中配置 Cluster 集群信息:在
application.properties
中指定 Redis Cluster 节点信息,Spring Boot 会自动管理节点连接和数据分片。 - 应用程序使用:在 Spring Boot 中直接使用
RedisTemplate
或StringRedisTemplate
进行 Redis 操作,Redis Cluster 会自动进行分片存储和主从切换。
1. 搭建并配置 Redis Cluster 集群
配置 Redis Cluster 的主从节点
在 Redis 集群中,每个节点需要独立的配置文件(例如 redis-7000.conf
、redis-7001.conf
等),以下是示例配置:
# redis-7000.conf
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 5000
appendonly yes
其他节点(7001、7002 等)配置类似,只需修改端口号即可。
启动 Redis 节点
启动所有 Redis 节点,例如 7000 至 7005 端口的 Redis 实例:
redis-server /path/to/redis-7000.conf
redis-server /path/to/redis-7001.conf
# ...依次启动其他节点
创建 Redis 集群
使用 redis-cli
命令行工具将这些节点添加到集群,并指定主从关系:
redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 --cluster-replicas 1
此命令会创建 3 个主节点(7000、7001、7002)和 3 个从节点(7003、7004、7005),实现主从配置和数据分片。
2. 在 Spring Boot 中配置 Redis Cluster 集成
在 Spring Boot 中,通过配置文件指定 Redis Cluster 的节点信息和连接属性,Spring Boot 会自动管理 Redis Cluster 集群。
配置 application.properties
# 配置 Redis Cluster 集群节点
spring.redis.cluster.nodes=127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005
# 最大重定向次数(适用于分片数据重定向)
spring.redis.cluster.max-redirects=3
# Redis 密码(如果 Redis 设置了密码)
spring.redis.password=yourpassword
spring.redis.cluster.nodes
:指定 Redis Cluster 集群中所有节点的 IP 和端口,Spring Boot 会自动连接到这些节点。spring.redis.cluster.max-redirects
:设置最大重定向次数,用于分片数据的重定向处理。spring.redis.password
:如果 Redis 设置了密码,可以在这里配置。
3. 在 Spring Boot 中使用 Redis Cluster
完成配置后,开发者可以直接使用 RedisTemplate
或 StringRedisTemplate
操作数据。Spring Boot 会根据 Redis Cluster 的节点信息自动进行分片管理和主从切换。
示例代码
以下是一个 Redis 服务类示例,展示了如何在 Spring Boot 中使用 Redis Cluster 进行数据的增删改查操作:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class RedisClusterService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
// 存储数据到 Redis Cluster
public void saveData(String key, String value) {
stringRedisTemplate.opsForValue().set(key, value);
}
// 从 Redis Cluster 获取数据
public String getData(String key) {
return stringRedisTemplate.opsForValue().get(key);
}
// 删除 Redis Cluster 中的数据
public void deleteData(String key) {
stringRedisTemplate.delete(key);
}
}
- 数据存储:使用
StringRedisTemplate
的opsForValue().set()
方法将数据存储到 Redis Cluster,数据会自动分配到对应的哈希槽。 - 数据读取:使用
opsForValue().get()
方法读取数据,Redis Cluster 会根据数据的哈希槽位置定位并返回数据。 - 自动故障转移:当某个主节点发生故障时,Redis Cluster 会自动将从节点提升为主节点,Spring Boot 无需额外配置即可继续使用。
6.2.3 Redis Cluster 的特点
优势
- 数据分片:Redis Cluster 将数据分片存储在多个节点上,支持水平扩展,适合大数据量场景。
- 自动故障转移:当主节点故障时,Redis Cluster 会自动提升从节点为主节点,实现自动故障转移,保障服务的高可用性。
- 高并发支持:分片结构允许 Redis Cluster 处理更多的并发请求,是高并发场景的理想选择。
局限性
- 跨节点事务支持有限:Redis Cluster 不支持多键操作的跨节点事务,分布在不同节点上的键无法进行原子操作。
- 节点之间的网络通信:Redis Cluster 的节点间需要互相通信,因此在网络不稳定的环境中可能导致集群不一致。
- 最低节点要求:Redis Cluster 要求至少 6 个节点(3 个主节点和 3 个从节点)才能实现高可用。
适用场景
- 大规模缓存系统:在需要处理大数据量的缓存场景中,Redis Cluster 提供数据分片和自动容错机制。
- 高并发的会话管理:适合处理大量用户会话,特别是在高并发应用中。
- 需要水平扩展的场景:Redis Cluster 通过分片实现了水平扩展,适合随着业务增长扩展数据容量的场景。
6.2.4.关于槽点的重新分配
1. 添加或删除节点后的槽点分配
当 Redis Cluster 中添加或删除节点时,Redis 不会自动重新分配槽点,因此需要手动进行槽点迁移。解决方法如下:
- 手动重新分配槽点:
- 使用
redis-trib.rb
或redis-cli --cluster reshard
命令,将槽点重新分配到新的节点或从即将删除的节点迁出。 - 该操作需要管理员手动执行,可以通过指定源节点、目标节点和槽数进行槽点迁移。
- 使用
- 自动化方案(可选):
- 通过 Spring Boot 中的定时任务(
@Scheduled
)检查集群状态,检测到节点增减后,自动调用redis-trib.rb
或redis-cli
执行槽点重新分配操作。
- 通过 Spring Boot 中的定时任务(
2. 负载均衡的槽点分配
Redis Cluster 不支持自动基于负载的槽点分配调整,因此实现负载均衡也需要手动操作。解决方法如下:
- 手动负载均衡:
- 定期使用 Redis 客户端(如 Jedis)或监控工具检查各节点的负载情况。
- 如果发现某些节点负载过高,可以手动执行
redis-trib.rb
或redis-cli --cluster reshard
命令,将部分槽点从负载较高的节点迁移至负载较低的节点,达到负载均衡。
- 自动化方案(可选):
- 在 Spring Boot 中通过定时任务(
@Scheduled
)自动监控节点负载。 - 结合负载监控逻辑,使用
ProcessBuilder
调用redis-trib.rb
脚本或redis-cli
执行槽点迁移,从而实现更灵活的负载均衡自动化。
- 在 Spring Boot 中通过定时任务(
8. Spring Boot 中的 Redis 消息队列实现
8.1.发布与订阅(Pub/Sub)
在 Spring Boot 中,利用 Redis 的发布/订阅(Pub/Sub)机制可以实现消息的实时推送与接收。这个过程分为三个主要步骤:
- 定义消息订阅者:创建一个订阅者,用来监听 Redis 频道的消息。
- 配置 Redis 的消息监听容器:利用
RedisMessageListenerContainer
来绑定频道和订阅者。 - 实现消息发布:通过
RedisTemplate
将消息发布到 Redis 频道。
8.1.1.四大组件
-
监听者(Listener):
- 每个监听者是一个自定义的类,用于接收和处理 Redis 频道的消息。
- 监听者实现了
MessageListener
接口,并重写onMessage
方法,当 Redis 频道接收到消息时,该方法会被自动调用。 - 每个监听者通常负责处理特定的频道消息内容,可以包含自定义的业务逻辑。
-
频道(Channel):
- Redis 频道是消息的逻辑分组,用于将消息发布到订阅的客户端或服务端。
- 在 Spring Boot 中,
ChannelTopic
表示 Redis 中的一个频道,频道名称通常以字符串表示,如"channel1"
和"channel2"
。 - 不同的频道可以承载不同类型的消息,实现了消息的分组和隔离。
-
适配器(MessageListenerAdapter):
MessageListenerAdapter
是连接RedisMessageListenerContainer
和自定义监听者(Listener)的桥梁。- 每个
MessageListenerAdapter
只能绑定一个监听者,但可以通过RedisMessageListenerContainer
监听多个频道。 - 适配器的主要作用是将监听者转换为 Redis 可识别的
MessageListener
,以便容器能够调用监听者的onMessage
方法。
-
容器(RedisMessageListenerContainer):
RedisMessageListenerContainer
是 Redis 消息监听器的核心管理组件。- 容器会持续监听 Redis 中的所有绑定频道,当指定频道中有消息发布时,容器会找到绑定的
MessageListenerAdapter
。 - 容器通过适配器将频道消息传递给监听者,从而触发监听者的
onMessage
方法。
8.1.2.定义监听者(Listener)
首先,我们定义两个监听者类 Channel1Subscriber
和 Channel2Subscriber
。这两个类分别监听 channel1
和 channel2
频道中的消息,并实现特定的处理逻辑。
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;
@Service
public class Channel1Subscriber implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(pattern);
String messageBody = new String(message.getBody());
System.out.println("Channel1 Subscriber received message from [" + channel + "]: " + messageBody);
// 处理来自 channel1 的消息的特定逻辑
}
}
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;
@Service
public class Channel2Subscriber implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(pattern);
String messageBody = new String(message.getBody());
System.out.println("Channel2 Subscriber received message from [" + channel + "]: " + messageBody);
// 处理来自 channel2 的消息的特定逻辑
}
}
代码解释
MessageListener
接口:两个监听者都实现了MessageListener
接口,这样可以使它们具备接收 Redis 消息的能力。onMessage
方法:当指定频道接收到新消息时,Redis 会调用监听者的onMessage
方法。pattern
参数:表示消息来自的频道名称,通常是字节数组,我们将其转换为字符串。message.getBody()
:获取消息的内容,将其转换为字符串以便处理。
@Service
注解:将监听者标记为 Spring Bean,使其可以被 Spring 容器管理和注入。
通过这个设置,每当 channel1
和 channel2
中有新消息发布时,Channel1Subscriber
和 Channel2Subscriber
会自动接收到该消息并输出到控制台。此时,两个监听者类只是负责接收消息,并执行特定的消息处理逻辑。
8.1.3.配置 Redis 消息监听器容器和适配器
为了让监听者接收消息,需要通过 RedisMessageListenerContainer
和 MessageListenerAdapter
进行配置。这里,我们使用 RedisConfig
配置类来完成这些配置。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
public class RedisConfig {
// 配置 Redis 消息监听器容器
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter channel1ListenerAdapter,
MessageListenerAdapter channel2ListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 将 channel1 的适配器与频道绑定
container.addMessageListener(channel1ListenerAdapter, new ChannelTopic("channel1"));
// 将 channel2 的适配器与频道绑定
container.addMessageListener(channel2ListenerAdapter, new ChannelTopic("channel2"));
return container;
}
// 配置 Channel1 的消息监听适配器
@Bean
public MessageListenerAdapter channel1ListenerAdapter(Channel1Subscriber channel1Subscriber) {
return new MessageListenerAdapter(channel1Subscriber);
}
// 配置 Channel2 的消息监听适配器
@Bean
public MessageListenerAdapter channel2ListenerAdapter(Channel2Subscriber channel2Subscriber) {
return new MessageListenerAdapter(channel2Subscriber);
}
}
代码解释
RedisConfig
配置类:
- 使用
@Configuration
注解声明为配置类,Spring 启动时会自动加载此配置类。
RedisMessageListenerContainer
容器:
- 作为 Redis 消息监听的核心容器,负责监听绑定的频道并将消息转发给绑定的监听者。
setConnectionFactory
:设置连接工厂,连接到 Redis 实例。addMessageListener
方法:绑定频道和适配器,使适配器可以监听特定的频道。- 第一个参数为
MessageListenerAdapter
,表示监听哪个适配器。 - 第二个参数为
ChannelTopic
,指定频道名称。
- 第一个参数为
- 通过
addMessageListener(channel1ListenerAdapter, new ChannelTopic("channel1"))
,将channel1ListenerAdapter
绑定到channel1
频道,类似地绑定channel2
频道和channel2ListenerAdapter
。
MessageListenerAdapter
:
channel1ListenerAdapter
方法:将Channel1Subscriber
监听者适配为 Redis 可用的MessageListenerAdapter
。channel2ListenerAdapter
方法:将Channel2Subscriber
监听者适配为MessageListenerAdapter
。- 每个
MessageListenerAdapter
都只能绑定一个监听者,确保每个适配器只处理一个监听者的消息接收。
8.1.4. 实现消息发布类
为实现发布消息到 Redis 频道,我们创建一个 MessagePublisher
类,利用 RedisTemplate
提供的 convertAndSend
方法将消息发布到指定的频道。这个发布类可以与 REST API 或其他服务调用相结合,方便消息的动态发布。
代码示例
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class MessagePublisher {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 发布消息到指定频道
public void publish(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
System.out.println("Published message to channel [" + channel + "]: " + message);
}
}
代码解释
-
MessagePublisher
类:使用@Service
注解将该类标记为 Spring 的 Bean,使其可以在其他组件中被注入和调用。 -
RedisTemplate
:- 通过
@Autowired
注入RedisTemplate
,该类提供了操作 Redis 的各种方法,包括发送消息。 RedisTemplate
是 Spring Data Redis 提供的用于执行 Redis 操作的工具,支持字符串、哈希、列表、集合等各种类型的操作。
- 通过
-
publish
方法:- 参数:
channel
表示目标频道的名称,message
表示要发送的消息内容。 convertAndSend
方法:通过RedisTemplate.convertAndSend
方法,将消息发送到指定的频道。- 第一个参数
channel
是频道名称,第二个参数message
是要发布的消息内容。
- 第一个参数
- 控制台输出:输出消息发布成功的日志,便于观察消息的发送情况。
- 参数:
效果:调用 publish
方法会将消息发布到指定的 Redis 频道,频道名称可以根据需要动态传入。
8.1.5. 测试发布与订阅
为了测试发布和订阅功能是否正常工作,我们可以创建一个 REST 控制器,通过 API 接口触发消息发布,然后观察控制台的输出,以确认订阅者是否接收到了来自不同频道的消息。
代码示例
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@Autowired
private MessagePublisher messagePublisher;
// 发布消息的 API 接口
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam String channel, @RequestParam String message) {
messagePublisher.publish(channel, message);
return "Message published successfully to channel: " + channel;
}
}
代码解释
MessageController
控制器:- 使用
@RestController
注解,将该类标记为 REST API 控制层,可以通过 HTTP 请求与客户端交互。
- 使用
sendMessage
方法:- 使用
@GetMapping
注解暴露一个 GET 请求的 API 接口/sendMessage
,用于向指定频道发送消息。 - 请求参数:
channel
:指定要发布消息的频道名称。message
:指定要发布的消息内容。
- 方法逻辑:调用
messagePublisher.publish(channel, message)
将消息发布到指定频道,并返回成功消息给客户端。
- 使用
8.2.异步处理任务(Stream)
在 Spring Boot 中,利用 Redis Stream 实现异步任务队列是处理高并发任务的一种高效方案。Redis Stream 提供了多消费者、消费者组、消息确认等高级功能,可以适用于订单处理、实时数据处理等需要高并发和高可靠性的场景。我们可以在同一个消费者组中使用多个消费者,并且根据需求来动态分配任务处理逻辑。以下是 Redis Stream 异步任务队列的详细实现和说明。
Redis Stream 异步任务处理流程
- 任务生产者:任务生产者将任务消息推送到 Redis Stream 队列中。
- 创建消费者组:通过 Redis 的消费者组机制,多个消费者可以从同一个队列中读取任务,并保证每条消息只被一个消费者处理。
- 任务消费者:多个消费者从消费者组中取出任务,按需进行任务分配和负载均衡。
- 确认机制:消费者处理完任务后,向 Redis 确认已处理该消息,防止重复消费。
8.2.1. 任务生产者:将任务放入 Redis Stream
任务生产者负责将任务消息添加到 Redis Stream 中。任务内容可以是字符串、JSON 或键值对,每条消息都会获得一个唯一的 RecordId
,方便后续处理和确认。
代码示例
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class StreamTaskProducer {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String STREAM_NAME = "taskStream";
// 将任务消息添加到 Redis Stream
public RecordId produceTask(String taskData) {
var record = StreamRecords.objectBacked(taskData).withStreamKey(STREAM_NAME);
RecordId recordId = redisTemplate.opsForStream().add(record);
System.out.println("Produced task with ID: " + recordId.getValue() + " and data: " + taskData);
return recordId;
}
}
代码解释
StreamTaskProducer
类:作为任务生产者,将任务推送到 Redis Stream。produceTask
方法:- 参数
taskData
表示任务内容,可以是字符串、JSON 或更复杂的对象。 StreamRecords.objectBacked(taskData).withStreamKey(STREAM_NAME)
创建了一个StreamRecord
对象,将taskData
添加到taskStream
队列中。redisTemplate.opsForStream().add(record)
将任务记录添加到 Stream 中并返回一个RecordId
。
- 参数
当调用
produceTask
方法时,任务数据被添加到taskStream
中,等待消费者处理。
8.2.2. 创建消费者组
为了让多个消费者能够并发消费同一个任务队列的任务,我们可以创建一个消费者组。消费者组负责管理任务的分配、记录消息消费状态、未确认消息等。
代码示例
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class StreamGroupManager {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String STREAM_NAME = "taskStream";
private static final String GROUP_NAME = "taskGroup";
// 创建消费者组
public void createGroup() {
try {
redisTemplate.opsForStream().createGroup(STREAM_NAME, GROUP_NAME);
System.out.println("Consumer group created: " + GROUP_NAME);
} catch (Exception e) {
System.out.println("Group already exists or error: " + e.getMessage());
}
}
}
代码解释
StreamGroupManager
类:用于创建和管理消费者组taskGroup
。createGroup
方法:使用redisTemplate.opsForStream().createGroup()
方法创建一个消费者组。若组已存在,则会捕获异常,避免重复创建。
通过调用
createGroup
,消费者组taskGroup
创建成功,使多个消费者可以并发读取任务。
8.2.3. 多个消费者从同一个消费者组中读取和处理任务
在 Redis 中,一个消费者组可以包含多个消费者,每个消费者都有一个唯一的名称。这种设计能够实现任务的自动负载均衡和确认机制。多个消费者可以并发地从同一队列中读取任务,Redis 会根据任务的消费状态自动将任务分配给不同的消费者。
代码示例
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class ConsumerService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String STREAM_NAME = "taskStream";
private static final String GROUP_NAME = "taskGroup";
public void consumeTasks(String consumerName) {
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
Object.class,
org.springframework.data.redis.connection.stream.Consumer.from(GROUP_NAME, consumerName),
StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed())
);
// 处理读取到的消息
if (messages != null) {
for (MapRecord<String, Object, Object> message : messages) {
System.out.println(consumerName + " processing task ID: " + message.getId() + ", Data: " + message.getValue());
// 确认消息已被处理
redisTemplate.opsForStream().acknowledge(GROUP_NAME, message);
}
}
}
}
代码解释
ConsumerService
类:该类通过传入不同的consumerName
参数来模拟多个消费者的工作。不同的consumerName
使得消费者组能够识别不同的消费者,并在同一组内为多个消费者分配任务。consumeTasks(String consumerName)
方法:Consumer.from(GROUP_NAME, consumerName)
:指定消费者组taskGroup
和当前消费者的名称consumerName
,从该组中获取未消费的消息。StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed())
:从上一次消费的位置继续消费,保证消息不会被重复消费。- 消息确认:每条消息在处理完毕后,通过
acknowledge
方法确认,防止消息被重复分配。
调用
consumeTasks
并传入不同的consumerName
,可以实现多消费者并发从同一任务队列中获取任务,Redis 会自动进行任务的负载均衡。
8.2.4.具体应用示例
在 Spring Boot 中,可以使用 @Scheduled
注解实现定时任务,以便模拟生产者持续产生任务,消费者持续轮询消费任务。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@EnableScheduling
public class MessageQueueApplication implements CommandLineRunner {
@Autowired
private StreamTaskProducer taskProducer;
@Autowired
private StreamGroupManager groupManager;
@Autowired
private ConsumerService consumerService;
private int taskCounter = 1;
@Override
public void run(String... args) throws Exception {
// Step 1: 创建消费者组(如果已经存在则跳过)
groupManager.createGroup();
}
// Step 2: 定时产生任务,每隔5秒产生一个新任务
@Scheduled(fixedRate = 5000)
public void produceTasks() {
String taskData = "Task data " + taskCounter++;
taskProducer.produceTask(taskData);
}
// Step 3: 消费者1轮询消费任务,每隔3秒检查新任务
@Scheduled(fixedRate = 3000)
public void consumer1Tasks() {
consumerService.consumeTasks("consumer1");
}
// Step 4: 消费者2轮询消费任务,每隔3秒检查新任务
@Scheduled(fixedRate = 3000)
public void consumer2Tasks() {
consumerService.consumeTasks("consumer2");
}
}
代码解释
@EnableScheduling
:启用 Spring 的定时任务功能。@Scheduled
注解:实现定时任务。produceTasks()
:每隔 5 秒钟调用一次,模拟任务生产者持续产生新任务。consumer1Tasks()
和consumer2Tasks()
:分别每隔 3 秒调用一次,模拟两个消费者持续轮询获取任务。
在 produceTasks()
中,任务计数器 taskCounter
自动递增,生成不同的数据内容,确保每个任务的数据是唯一的。
8.2.5. 不同任务类型的处理
在某些应用场景中,不同类型的任务可能需要不同的处理逻辑。我们可以在任务消息中添加“任务类型”字段,消费者在读取消息后,根据任务类型选择合适的处理方法。
代码示例
1.在任务消息中添加类型字段
Map<String, Object> taskData = new HashMap<>();
taskData.put("type", "TYPE_A"); // 设置任务类型
taskData.put("content", "Task content for TYPE_A");
// 将任务添加到 Stream 中
redisTemplate.opsForStream().add(StreamRecords.mapBacked(taskData).withStreamKey(STREAM_NAME));
2.在消费者中根据任务类型处理不同任务
public void consumeTasks(String consumerName) {
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
Object.class,
org.springframework.data.redis.connection.stream.Consumer.from(GROUP_NAME, consumerName),
StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed())
);
if (messages != null) {
for (MapRecord<String, Object, Object> message : messages) {
// 解析任务类型
String taskType = (String) message.getValue().get("type");
// 根据任务类型调用不同的处理方法
if ("TYPE_A".equals(taskType)) {
handleTypeATask(message);
} else if ("TYPE_B".equals(taskType)) {
handleTypeBTask(message);
}
// 确认消息已被处理
redisTemplate.opsForStream().acknowledge(GROUP_NAME, message);
}
}
}
private void handleTypeATask(MapRecord<String, Object, Object> message) {
System.out.println("Handling TYPE_A task: " + message.getValue().get("content"));
}
private void handleTypeBTask(MapRecord<String, Object, Object> message) {
System.out.println("Handling TYPE_B task: " + message.getValue().get("content"));
}
代码解释
- 任务类型字段:在消息数据中添加
type
字段,用于指定任务类型。 - 任务筛选和处理:在消费时,根据
type
字段的值调用不同的处理方法,如handleTypeATask
和handleTypeBTask
。 - 消息确认:无论任务类型如何,处理完毕后通过
acknowledge
确认消息,防止重复消费。
通过在消息中添加 type
字段,可以实现多种类型任务的处理方案,满足复杂场景的需求。
8.3.任务调度(延迟队列)
通过 Redis 的 Sorted Set
可以轻松实现一个延迟队列,这种延迟队列适用于需要定时触发的任务或批处理任务场景。其基本思路是:在 Sorted Set
中,任务的 score
表示其执行时间戳。通过定期检查,将所有到期任务取出并执行,实现任务的延迟处理。
实现思路
- 任务添加:将任务添加到 Redis 的
Sorted Set
中,使用未来的时间戳作为任务的score
值,表示任务的到期时间。 - 任务检查与取出:设置定时任务定期检查
Sorted Set
中到期任务,将score
(时间戳)小于或等于当前时间的任务取出,这些任务即为已到期的任务。 - 任务处理与删除:处理到期任务,并将它们从
Sorted Set
中移除,确保任务不会重复执行。
8.3.1.两大组件
为了实现这个延迟队列,我们将定义两个主要组件:
- DelayedTaskProducer:将任务添加到 Redis 延迟队列中。
- DelayedTaskScheduler:定期检查 Redis 延迟队列,并处理到期任务。
1. DelayedTaskProducer
:延迟任务生产者
这个类负责将任务添加到 Redis 的 Sorted Set
中。我们使用任务的未来时间戳作为 score
来标记任务的到期时间。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class DelayedTaskProducer {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String DELAYED_TASK_QUEUE = "delayed_task_queue";
/**
* 添加延迟任务
*
* taskId 任务ID或描述
* delayInSeconds 延迟时间(秒)
*/
public void addDelayedTask(String taskId, long delayInSeconds) {
// 当前时间 + 延迟时间 = 任务的到期时间戳(秒)
long score = System.currentTimeMillis() / 1000 + delayInSeconds;
// 将任务加入 Redis Sorted Set,key 为任务 ID,score 为到期时间戳
redisTemplate.opsForZSet().add(DELAYED_TASK_QUEUE, taskId, score);
System.out.println("Added task " + taskId + " with delay " + delayInSeconds + " seconds.");
}
}
代码解释
DELAYED_TASK_QUEUE
:这是 Redis 中存储延迟任务的Sorted Set
键名,所有延迟任务都存储在这个集合中。addDelayedTask
方法:- 参数
taskId
:任务的唯一标识,可以是任务的 ID 或者任务的描述。 - 参数
delayInSeconds
:表示任务的延迟时间,以秒为单位。 - 计算到期时间
score
:System.currentTimeMillis() / 1000 + delayInSeconds
计算未来的时间戳,表示任务的到期时间。 - 添加到
Sorted Set
:使用redisTemplate.opsForZSet().add()
方法将任务添加到Sorted Set
中。taskId
作为集合的成员,score
表示任务的到期时间,用于排序。
- 参数
当你调用 addDelayedTask
方法时,任务会被添加到 Sorted Set
中,并根据 score
进行排序。即:到期时间最近的任务会排在集合前面,确保任务可以按顺序被处理。
2. DelayedTaskScheduler
:定期检查和处理到期任务
DelayedTaskScheduler
负责定期检查 Redis 中的 Sorted Set
,取出已到期的任务并处理。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.Set;
@Service
public class DelayedTaskScheduler {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String DELAYED_TASK_QUEUE = "delayed_task_queue";
/**
* 定期检查并处理到期任务
*/
@Scheduled(fixedRate = 5000) // 每隔5秒执行一次
public void processDelayedTasks() {
long now = System.currentTimeMillis() / 1000; // 当前时间戳(秒级)
// 获取所有到期任务(score 小于等于当前时间的任务)
Set<Object> taskIds = redisTemplate.opsForZSet().rangeByScore(DELAYED_TASK_QUEUE, 0, now);
// 遍历每个到期任务,执行处理并移除
if (taskIds != null && !taskIds.isEmpty()) {
for (Object taskId : taskIds) {
// 处理任务逻辑
System.out.println("Processing delayed task: " + taskId);
// 从队列中删除已处理的任务
redisTemplate.opsForZSet().remove(DELAYED_TASK_QUEUE, taskId);
}
}
}
}
代码解释
processDelayedTasks
方法:每隔 5 秒运行一次,定期检查Sorted Set
中是否有到期任务。- 当前时间
now
:获取当前的时间戳,用于判断任务的到期情况。 rangeByScore
方法:redisTemplate.opsForZSet().rangeByScore(DELAYED_TASK_QUEUE, 0, now)
从Sorted Set
中获取所有score
小于等于当前时间戳的任务,这些任务即已到期。- 遍历和处理任务:对于每个已到期的任务,执行相应的处理逻辑(这里以输出日志表示任务的处理过程)。
- 移除任务:使用
remove
方法将已处理的任务从Sorted Set
中删除,防止任务被重复处理。
- 当前时间
processDelayedTasks
方法确保到期任务在预定时间后自动执行,并且通过定时检查保持任务调度的稳定性。
8.3.2.具体应用示例
在应用的某个部分,可以使用 DelayedTaskProducer
的 addDelayedTask
方法来添加任务。例如,可以在控制器或服务中使用此方法:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class DelayedTaskInitializer {
@Autowired
private DelayedTaskProducer delayedTaskProducer;
@PostConstruct
public void init() {
// 在应用启动时,添加一些延迟任务示例
delayedTaskProducer.addDelayedTask("task1", 10); // 延迟10秒执行
delayedTaskProducer.addDelayedTask("task2", 20); // 延迟20秒执行
delayedTaskProducer.addDelayedTask("task3", 30); // 延迟30秒执行
}
}
代码解释
@PostConstruct
注解:init
方法会在DelayedTaskInitializer
Bean 初始化完成后自动执行,因此应用启动时会自动添加示例任务。- 任务添加:
addDelayedTask
方法将三个任务分别添加到延迟队列中,它们的到期时间分别是 10 秒、20 秒和 30 秒。
8.3.3.执行流程总结
1.添加延迟任务:
- 在应用启动时,
DelayedTaskInitializer
中的init
方法通过@PostConstruct
自动执行,添加延迟任务task1
、task2
和task3
。 - 每个任务会被插入 Redis
Sorted Set
中,且带有不同的到期时间。
2.定时检测任务:
DelayedTaskScheduler
中的processDelayedTasks
方法每 5 秒执行一次。- 在每次执行时,
processDelayedTasks
会检查Sorted Set
中所有分数小于等于当前时间的任务,并将这些任务视为到期任务。
3.任务处理和清理:
- 对于每个到期任务,调度器会执行处理逻辑(如打印任务 ID),并将任务从
Sorted Set
中移除,避免任务被重复执行。
9. 性能优化与监控
9.1. 连接池配置
连接池可以帮助管理 Redis 和 MySQL 的连接资源,避免频繁建立和销毁连接,从而减少系统开销,提高效率。合理的连接池配置可以防止资源耗尽,确保系统稳定运行。
9.1.1 Redis 连接池配置
在 Spring Boot 中可以通过 lettuce
或 jedis
来配置 Redis 连接池,以下示例基于 lettuce
配置连接池参数。
# Redis 连接配置
spring.redis.host=localhost
spring.redis.port=6379
# Lettuce 连接池配置
spring.redis.lettuce.pool.max-active=10 # 最大连接数
spring.redis.lettuce.pool.max-idle=5 # 最大空闲连接数
spring.redis.lettuce.pool.min-idle=2 # 最小空闲连接数
spring.redis.lettuce.pool.max-wait=1000 # 最大等待时间(毫秒)
代码解释
max-active
:最大连接数,设置 Redis 可以同时保持的最大连接数。max-idle
:最大空闲连接数,连接池中可以保持的最大空闲连接数,超过的连接将会被释放。min-idle
:最小空闲连接数,保持的最少空闲连接数,当连接数低于此值时会创建新连接。max-wait
:最大等待时间,连接池获取连接的等待时间(毫秒),如果超过时间未获取到连接,则抛出异常。
通过合理配置连接池参数,可以避免 Redis 的连接池资源耗尽,提高应用的并发处理能力。
9.1.2 MySQL 连接池配置
Spring Boot 默认使用 HikariCP 作为 MySQL 数据库的连接池,可以通过以下配置进行优化。
# MySQL 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/mydatabase
spring.datasource.username=root
spring.datasource.password=password
# HikariCP 连接池配置
spring.datasource.hikari.maximum-pool-size=10 # 最大连接数
spring.datasource.hikari.minimum-idle=2 # 最小空闲连接数
spring.datasource.hikari.idle-timeout=30000 # 空闲连接的最大存活时间
spring.datasource.hikari.connection-timeout=30000 # 获取连接的最大等待时间(毫秒)
spring.datasource.hikari.max-lifetime=1800000 # 连接的最大存活时间(毫秒)
代码解释
maximum-pool-size
:最大连接数,即连接池中允许的最大连接数。minimum-idle
:最小空闲连接数,保持的最少空闲连接数,低于此值时会创建新连接。idle-timeout
:空闲连接的最大存活时间,超过此时间的空闲连接将被释放。connection-timeout
:连接池中获取连接的最大等待时间,如果超过此时间未获取到连接,则抛出异常。max-lifetime
:连接的最大存活时间,避免长时间使用的连接失效。
通过配置 HikariCP 参数,可以更好地管理数据库连接池,减少系统在高并发场景下的连接资源问题。
9.2. 性能监控
性能监控可以帮助实时查看系统运行状态,识别性能瓶颈。可以使用 Spring Boot Actuator、Redis CLI 和 MySQL 监控工具来跟踪关键指标,如缓存命中率、连接池状态、数据库响应时间等。
9.2.1 使用 Spring Boot Actuator
Spring Boot Actuator 提供了丰富的监控端点,可以帮助开发者快速了解应用的运行状态。
<!-- 引入 Spring Boot Actuator 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置 Actuator 端点
在 application.properties
中启用 Actuator 所需的端点,以便查看 Redis 和 MySQL 的状态。
management.endpoints.web.exposure.include=health,metrics,beans
management.endpoint.health.show-details=always
使用端点
/actuator/health
:查看应用的健康状态,包含 Redis 和 MySQL 连接状态。/actuator/metrics
:查看应用的各种性能指标,如内存使用、CPU、线程等信息。
9.2.2 Redis CLI 监控 Redis 状态
Redis 提供了 INFO
命令,可以查看缓存的命中率、内存使用等详细信息。
redis-cli INFO
输出中的重要字段:
keyspace_hits
和keyspace_misses
:表示缓存命中和未命中的次数,可以计算缓存命中率:命中率 = keyspace_hits / (keyspace_hits + keyspace_misses)
。connected_clients
:当前连接到 Redis 的客户端数量。used_memory
:Redis 使用的内存总量。expired_keys
:过期的键数量,有助于判断是否需要优化键的过期策略。
9.2.3 MySQL 监控工具
可以使用 MySQL 的 SHOW STATUS
命令来监控 MySQL 的性能指标,查看连接数、查询数等。
SHOW GLOBAL STATUS;
常用的状态字段:
Threads_connected
:当前连接的客户端数量。Connections
:成功连接的总次数。Queries
:服务器处理的查询总数。Slow_queries
:执行时间超过long_query_time
的慢查询总数。
这些信息有助于优化数据库性能,如增加连接池大小或优化查询语句。
9.3. Redis Keyspace Notifications
Redis Keyspace Notifications 是 Redis 提供的键变动通知功能,可以配置 Redis 将键的变动(如过期、删除、更新等)通知到客户端,以便实时监控缓存变化。
9.3.1.配置 Redis Keyspace Notifications
可以在 Redis 配置文件(redis.conf
)中启用 Keyspace Notifications,或使用命令行动态配置。
# 监听所有键的过期事件和删除事件
config set notify-keyspace-events Ex
Ex
:表示启用键的过期事件和删除事件通知。K
:表示所有键(keyspace)事件的通知。
9.3.2.Spring Boot 中使用 Redis Keyspace Notifications
在 Spring Boot 中使用 Redis Keyspace Notifications,可以创建一个监听器来接收 Redis 键变动的通知事件。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
public class RedisConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 监听所有键的过期事件
container.addMessageListener(listenerAdapter, new PatternTopic("__keyevent@0__:expired"));
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter() {
return new MessageListenerAdapter(new RedisKeyExpirationListener());
}
public static class RedisKeyExpirationListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
System.out.println("Key expired: " + expiredKey);
// 在这里处理键过期事件,例如日志记录或重新加载缓存
}
}
}
代码解释
RedisMessageListenerContainer
:配置 Redis 消息监听器容器,允许监听 Redis 中的事件。PatternTopic("__keyevent@0__:expired")
:监听 Redis 中expired
事件,即键过期事件。@0
表示 Redis 数据库索引。RedisKeyExpirationListener
:自定义监听器,监听 Redis 键的过期事件。收到过期事件时会调用onMessage
方法,并打印或记录过期的键。
通过 Keyspace Notifications,可以实时监控 Redis 键的变化,如缓存失效、删除等事件,适合于对缓存内容有严格要求的场景。
10. 应用场景实践
10.1.用户登录与会话管理
在用户登录和会话管理中,通过 Redis 缓存会话信息,可以提高会话的管理效率。Redis 的高性能访问、自动过期机制以及丰富的数据结构,特别适合会话管理。
10.1.1.用户登录与会话管理的实现流程
- 用户登录时创建会话:用户成功登录后,将用户信息存入 Redis 缓存中,生成唯一的会话 ID 并设置有效期。
- 访问控制:每次用户请求时,通过会话 ID 检查 Redis 中的会话信息,确保用户的会话有效。
- 会话续期:当用户在会话有效期内操作时,可以自动延长会话有效期。
- 会话删除:当用户登出或会话超时时,从 Redis 中移除会话信息。
10.1.2. 创建用户会话
当用户登录成功时,调用 createSession
方法,在 Redis 中创建会话信息,并设置过期时间。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class SessionService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String SESSION_KEY_PREFIX = "session:"; // 会话键前缀
private static final long SESSION_TIMEOUT = 30; // 会话超时时间(分钟)
/**
* 创建用户会话
* @param sessionId 会话ID
* @param userInfo 用户信息对象
*/
public void createSession(String sessionId, Object userInfo) {
String sessionKey = SESSION_KEY_PREFIX + sessionId;
// 存储会话信息,并设置超时时间
redisTemplate.opsForValue().set(sessionKey, userInfo, SESSION_TIMEOUT, TimeUnit.MINUTES);
System.out.println("Session created: " + sessionId);
}
}
代码解释
RedisTemplate
:Spring Boot 提供的 Redis 操作模板,简化了与 Redis 的交互。SESSION_KEY_PREFIX
:会话键的前缀,所有会话数据的键都带有该前缀,以便在 Redis 中区分不同的数据类型。sessionId
作为唯一标识符,将用户信息与特定的会话关联。SESSION_TIMEOUT
:会话的超时时间,在用户登录成功后,将会话信息存入 Redis,并设置超时时间(如 30 分钟),超过时间后会话自动失效。createSession
方法:sessionKey
:Redis 中存储的键,用sessionId
生成独特键名(例如session:user123
)。redisTemplate.opsForValue().set(...)
:将sessionKey
与userInfo
对象绑定,并设置超时时间,使用TimeUnit.MINUTES
表示 30 分钟后自动过期。
当用户登录成功时,调用 createSession
,将会话信息存储到 Redis 中。因为 Redis 高性能访问特性,这样的会话存储操作非常迅速,适合高并发的登录场景。
10.1.3. 获取会话信息
在每次用户请求时,调用 getSession
方法,通过 Redis 查询用户的会话信息,以便验证会话是否有效。
/**
* 获取用户会话信息
* @param sessionId 会话ID
* @return 用户信息对象
*/
public Object getSession(String sessionId) {
String sessionKey = SESSION_KEY_PREFIX + sessionId;
return redisTemplate.opsForValue().get(sessionKey);
}
代码解释
getSession
方法:- 根据
sessionId
构造sessionKey
。 redisTemplate.opsForValue().get(sessionKey)
从 Redis 中获取用户信息。- 返回的用户信息对象(
userInfo
)将用于验证用户的身份、权限等。 - 如果返回
null
,表示会话已过期或无效。
- 根据
通过 getSession
可以快速获取会话数据,验证会话有效性。
10.1.4. 延长会话有效期
当用户在会话有效期内进行操作时,可以调用 extendSession
方法,延长会话的过期时间,避免用户频繁重新登录。
/**
* 延长用户会话有效期
* @param sessionId 会话ID
*/
public void extendSession(String sessionId) {
String sessionKey = SESSION_KEY_PREFIX + sessionId;
redisTemplate.expire(sessionKey, SESSION_TIMEOUT, TimeUnit.MINUTES);
System.out.println("Session extended: " + sessionId);
}
代码解释
extendSession
方法:redisTemplate.expire(...)
:重新设置会话的过期时间,延长SESSION_TIMEOUT
分钟。- 每次调用
extendSession
时,Redis 会更新会话的过期时间。 - 在用户有操作时,通过定期延长有效期,可以有效提高用户体验,避免会话自动过期。
10.1.5. 删除会话信息
在用户主动登出时,调用 deleteSession
删除会话信息,从 Redis 中移除该会话数据。
/**
* 删除用户会话
* @param sessionId 会话ID
*/
public void deleteSession(String sessionId) {
String sessionKey = SESSION_KEY_PREFIX + sessionId;
redisTemplate.delete(sessionKey);
System.out.println("Session deleted: " + sessionId);
}
代码解释
deleteSession
方法:redisTemplate.delete(sessionKey)
:根据会话 ID 删除 Redis 中对应的会话数据。- 当用户退出登录时,删除会话信息,避免无效会话数据滞留在 Redis 中,节省内存空间。
通过 Redis 的自动过期和删除机制,可以实现灵活的会话管理,当用户不再活动或主动退出时,相关会话数据会被自动清理。
10.1.6.应用场景与优势
使用 Redis 管理用户会话可以显著提高系统的响应速度,特别适用于大规模用户并发登录的场景:
- 高性能访问:Redis 的内存存储和快速响应适合会话管理,确保用户会话数据的高效读写。
- 自动清理:利用 Redis 的过期机制,自动清理超时会话,避免存储无效数据。
- 支持分布式部署:在分布式应用中,Redis 提供的会话存储可以在不同节点之间共享,确保会话的一致性和持久性。
10.2.购物车与库存管理
在电商系统中,购物车与库存管理是高并发场景的核心需求。Redis 提供的哈希结构和原子性操作,非常适合实现高效的购物车管理和库存扣减操作。通过 Redis,可以实现购物车的快速更新,确保用户购物体验流畅,同时利用原子性操作保证库存扣减的一致性。
10.2.1.购物车与库存管理的实现流程
- 购物车管理:每个用户的购物车可以通过 Redis 哈希结构进行存储,以用户 ID 为前缀的键名为哈希键名,商品 ID 为字段,商品数量为字段值。
- 库存管理:通过 Redis 的
incrBy
方法进行库存操作,可以确保库存扣减的原子性。 - 事务控制:在 Redis 中使用事务操作,保证添加商品到购物车和库存扣减的一致性。
我们通过一个 ShoppingCartService
类来实现购物车和库存的操作,包括添加商品到购物车、扣减库存以及将两者操作结合的事务控制。
10.2.2. 购物车管理
Redis 的哈希结构可以将每个用户的购物车作为一个哈希表,键名是 cart:{userId}
,字段是商品 ID,字段值是商品数量。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class ShoppingCartService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String CART_KEY_PREFIX = "cart:"; // 购物车键前缀
/**
* 添加商品到购物车
* @param userId 用户ID
* @param productId 商品ID
* @param quantity 数量
*/
public void addToCart(String userId, String productId, int quantity) {
String cartKey = CART_KEY_PREFIX + userId;
redisTemplate.opsForHash().increment(cartKey, productId, quantity); // 累加商品数量
System.out.println("Product added to cart: " + productId + ", Quantity: " + quantity);
}
/**
* 查看购物车
* @param userId 用户ID
* @return 购物车内容
*/
public Map<Object, Object> viewCart(String userId) {
String cartKey = CART_KEY_PREFIX + userId;
return redisTemplate.opsForHash().entries(cartKey);
}
}
代码解释
CART_KEY_PREFIX
:购物车键的前缀,用于区分用户的购物车数据。例如,用户user1
的购物车键名为cart:user1
。addToCart
方法:- 使用
increment
方法增加商品数量,如果购物车中已有此商品,会累加数量;如果没有,则会新建该商品的记录。 - 例如:用户
user1
购买商品product123
数量为2
时,键cart:user1
中字段product123
的值会累加 2。
- 使用
viewCart
方法:使用entries
获取购物车的所有内容。每个购物车都是一个哈希表,包含商品 ID 和数量的键值对。
在 Redis 中,哈希结构不仅减少了存储空间,还能高效处理购物车中多个商品的数据。
10.2.3. 库存管理
通过 Redis 的原子操作 increment
,可以确保库存扣减的操作具有原子性,从而避免超卖问题。每次扣减库存前都会检查是否有足够的库存量。
private static final String STOCK_KEY_PREFIX = "stock:"; // 库存键前缀
/**
* 减少商品库存
* @param productId 商品ID
* @param quantity 数量
* @return 是否成功
*/
public boolean deductStock(String productId, int quantity) {
String stockKey = STOCK_KEY_PREFIX + productId;
Long stock = redisTemplate.opsForValue().increment(stockKey, -quantity); // 扣减库存
if (stock != null && stock >= 0) {
System.out.println("Stock deducted for product: " + productId + ", Remaining: " + stock);
return true;
} else {
// 库存不足,回滚扣减操作
redisTemplate.opsForValue().increment(stockKey, quantity);
System.out.println("Stock not enough for product: " + productId);
return false;
}
}
代码解释
STOCK_KEY_PREFIX
:库存键的前缀,例如stock:product123
表示商品product123
的库存。deductStock
方法:- 使用
increment
方法原子性地减少库存。-quantity
表示扣减库存。 - 如果库存足够,返回
true
表示扣减成功;如果库存不足,库存值会回滚,即重新加回quantity
,确保不会产生负库存。 - 这种原子性操作能有效避免高并发下的超卖问题。
- 使用
10.2.4. 事务控制:添加商品到购物车并扣减库存
在实际场景中,添加商品到购物车和库存扣减应作为一个事务进行控制。我们可以通过 Redis 的 watch
机制,监视库存变化,并确保购物车和库存操作的一致性。
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
public void addToCartAndDeductStock(String userId, String productId, int quantity) {
redisTemplate.execute(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) {
String cartKey = CART_KEY_PREFIX + userId;
String stockKey = STOCK_KEY_PREFIX + productId;
// 监视库存键,确保库存在操作过程未被其他操作修改
operations.watch(stockKey);
// 检查库存是否充足
Long stock = (Long) operations.opsForValue().get(stockKey);
if (stock == null || stock < quantity) {
System.out.println("Stock not enough for product: " + productId);
return false; // 库存不足,操作终止
}
operations.multi(); // 开启事务
operations.opsForHash().increment(cartKey, productId, quantity); // 添加商品到购物车
operations.opsForValue().increment(stockKey, -quantity); // 扣减库存
return operations.exec(); // 执行事务
}
});
}
代码解释
watch(stockKey)
:通过watch
监视库存键stockKey
,确保在事务执行过程中,库存不会被其他操作修改。- 库存检查:在事务执行前,先检查 Redis 中的库存是否足够,若不足则终止操作。
multi()
和exec()
:启动 Redis 事务,先将购物车和库存扣减的指令放入队列,调用exec
执行事务。若期间库存发生变化,事务将失败,以确保库存和购物车数据的一致性。- 事务执行逻辑:
- 使用
increment
将商品添加到购物车。 - 同时,使用
increment
原子性地减少库存数量。 - 通过事务控制,确保购物车添加和库存扣减的一致性,防止在高并发情况下库存不足时的错误扣减问题。
- 使用
10.2.5.应用场景与优势
使用 Redis 进行购物车和库存管理,适合高并发电商场景:
- 高效购物车管理:Redis 哈希结构可以快速更新和查询购物车中的商品,减少数据库访问压力。
- 原子性库存操作:Redis 的原子操作确保了库存扣减的并发安全性,避免超卖问题。
- 事务控制:通过 Redis 事务,可以确保购物车和库存的操作一致性,避免出现库存不足却添加到购物车的情况。
10.3.数据统计与排行榜
在社交应用、游戏等场景中,经常需要实现实时的数据统计和排行榜功能。Redis 的 Sorted Set
数据结构通过分数排序,为实时更新和快速查询排名提供了理想的解决方案。在 Sorted Set
中,元素是有序的,且可以根据分数动态调整排名,非常适合排行榜、得分统计等需求。
10.3.1.数据统计与排行榜的实现流程
- 添加或更新分数:将用户的得分(或其他数据统计值)添加到
Sorted Set
中。如果用户已存在则更新分数,如果用户不存在则添加用户。 - 获取排行榜:通过
Sorted Set
的排序特性,可以快速获取前 N 名用户,形成实时排行榜。 - 查询用户排名:Redis 提供的
rank
操作可以查询特定用户的当前排名,实现用户排名的实时查询。
以下代码以一个简单的 LeaderboardService
为例,展示了排行榜的增、查操作,包括添加或更新分数、获取前 N 名以及获取用户的实时排名。
10.3.2. 添加或更新分数
使用 Redis 的 ZADD
操作将用户和分数存入 Sorted Set
中。如果用户已存在,则更新分数;如果用户不存在,则添加用户。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class LeaderboardService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String LEADERBOARD_KEY = "leaderboard"; // 排行榜键
/**
* 添加或更新用户分数
* @param userId 用户ID
* @param score 新增分数
*/
public void addOrUpdateScore(String userId, double score) {
redisTemplate.opsForZSet().incrementScore(LEADERBOARD_KEY, userId, score);
System.out.println("Score updated for user: " + userId + ", Score: " + score);
}
}
代码解释
LEADERBOARD_KEY
:用于存储排行榜的Sorted Set
键名,可以使用业务相关的名称来区分不同排行榜(例如:积分榜、竞赛榜等)。addOrUpdateScore
方法:- 使用
incrementScore
方法添加或更新用户的分数。 userId
表示用户的唯一标识符,score
表示得分。- 如果用户已存在于
Sorted Set
中,分数会累加;如果用户不存在则新增该用户的分数记录。 - 每次分数变动时,
Sorted Set
会根据分数自动重新排序,从而确保排行榜始终有序。
- 使用
10.3.3. 获取排行榜
可以通过 Redis 的 ZRANGE
操作,按分数从高到低获取前 N 名用户,并构成排行榜。此方法适用于展示全局排名。
/**
* 获取排行榜
* @param topN 取前N名
* @return 排名前N的用户ID列表
*/
public Set<Object> getTopN(int topN) {
return redisTemplate.opsForZSet().reverseRange(LEADERBOARD_KEY, 0, topN - 1);
}
代码解释
getTopN
方法:- 使用
reverseRange
获取Sorted Set
中前 N 名用户的数据。reverseRange
会按分数从高到低排序。 topN
参数指定需要获取的前 N 名用户。- 返回的
Set<Object>
中包含前 N 名用户的 ID,可以根据需要进一步查询用户详细信息。
- 使用
这种方式可以在排行榜页面中直接显示前 N 名用户的实时排名,非常适合用作游戏积分榜、社交平台活跃用户榜等。
10.3.4. 获取用户排名
为了查询特定用户的当前排名,可以使用 Redis 的 ZREVRANK
操作,通过分数从高到低排列,返回用户的名次。
/**
* 获取用户排名
* @param userId 用户ID
* @return 用户排名(1为最高)
*/
public Long getUserRank(String userId) {
Long rank = redisTemplate.opsForZSet().reverseRank(LEADERBOARD_KEY, userId);
if (rank != null) {
System.out.println("Rank for user " + userId + ": " + (rank + 1));
}
return rank != null ? rank + 1 : null; // 排名从0开始,需加1
}
代码解释
getUserRank
方法:- 使用
reverseRank
查询指定用户的排名,从分数从高到低排序中获取用户的索引值。 - 如果用户存在,则返回的索引值加 1 表示用户的实际排名。例如,索引值
0
表示第一名,故rank + 1
为最终排名。 - 如果用户不存在,返回
null
表示用户未上榜。
- 使用
通过实时查询用户排名,可以向用户展示其当前排名位置,增加用户的竞争体验。
10.4.5. 获取用户的分数
有时除了查询用户排名,还需要查询用户的得分。可以使用 Redis 的 ZSCORE
操作获取用户的分数信息。
/**
* 获取用户的分数
* @param userId 用户ID
* @return 用户的分数
*/
public Double getUserScore(String userId) {
Double score = redisTemplate.opsForZSet().score(LEADERBOARD_KEY, userId);
if (score != null) {
System.out.println("Score for user " + userId + ": " + score);
}
return score;
}
代码解释
getUserScore
方法:- 使用
score
获取Sorted Set
中指定用户的分数。 - 如果用户存在,返回其当前分数;如果用户不存在,返回
null
。 - 分数信息可以在用户详情页面、个人排行榜等场景中展示,帮助用户了解自己的得分情况。
- 使用
10.4.6.应用场景与优势
使用 Redis 的 Sorted Set
实现排行榜和实时数据统计,有以下优势:
- 实时性强:Redis 的分数排序机制使得排行榜能够实时更新,即用户分数更新后立即调整排名。
- 高并发性能:Redis 基于内存的数据结构和原子性操作,能够在高并发场景下保持高性能,特别适合游戏、社交平台等。
- 操作简洁:通过
ZADD
、ZRANGE
、ZSCORE
等操作,能够快速实现排名查询和数据统计,降低开发复杂度。
应用场景
- 游戏积分排行榜:展示游戏玩家的得分和排名,实时更新玩家的竞争状态。
- 社交平台活跃度排名:根据活跃度(如发帖、点赞、评论等)进行排序,鼓励用户保持活跃。
- 商户销售排名:电商平台可以基于商户的销售额,展示月度或年度销售排行榜,激励商家提升业绩。