Flink 热存储维表 使用 Guava Cache 减轻访问压力
目录
背景
Guava Cache 简介
实现方案
1. 项目依赖
2. Guava Cache 集成到 Flink
(1) 定义 Cache
(2) 使用 Cache 优化维表查询
3. 应用运行效果
(1) 维表查询逻辑优化
(2) 减少存储压力
Guava Cache 配置优化
总结
背景
在实时计算场景中,Flink 应用中经常需要通过维表进行维度数据的关联。为了保证关联的实时性,常将维表数据存储在 Redis 或数据库中。然而,这种方案可能会因高频访问导致存储压力过大,甚至出现性能瓶颈。
为了解决这个问题,可以在 Flink 中引入本地缓存。本文介绍如何通过 Google 的开源库 Guava Cache,实现对热存储维表访问的优化。
Guava Cache 简介
Guava Cache 是 Google 开发的一个 Java 缓存工具库,具有以下优点:
- 支持本地缓存,提升查询性能。
- 提供缓存淘汰策略(如基于时间或容量)。
- 线程安全,适合高并发场景。
- 提供监听机制,可在缓存失效时触发回调。
实现方案
1. 项目依赖
在 Maven 项目中引入 Guava 依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
2. Guava Cache 集成到 Flink
以下是一个典型的实现步骤:
(1) 定义 Cache
使用 Guava 提供的 CacheBuilder
创建一个本地缓存:
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.TimeUnit;
public class CacheUtil {
private static final Cache<String, String> DIM_CACHE = CacheBuilder.newBuilder()
.maximumSize(10000) // 最大缓存数量
.expireAfterWrite(10, TimeUnit.MINUTES) // 缓存过期时间
.build();
public static String getFromCache(String key) {
return DIM_CACHE.getIfPresent(key);
}
public static void putToCache(String key, String value) {
DIM_CACHE.put(key, value);
}
}
(2) 使用 Cache 优化维表查询
在自定义的 RichFlatMapFunction
中使用缓存查询维表数据:
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
public class DimensionJoinFunction extends RichFlatMapFunction<String, String> {
@Override
public void open(Configuration parameters) throws Exception {
// 初始化连接到 Redis 或其他外部存储
}
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String dimKey = extractKey(value);
// 1. 先查询缓存
String dimValue = CacheUtil.getFromCache(dimKey);
// 2. 如果缓存未命中,再查询外部存储
if (dimValue == null) {
dimValue = queryFromExternalStorage(dimKey);
if (dimValue != null) {
CacheUtil.putToCache(dimKey, dimValue); // 写入缓存
}
}
// 3. 关联维度数据
if (dimValue != null) {
String result = enrichData(value, dimValue);
out.collect(result);
}
}
private String extractKey(String value) {
// 从输入数据中提取维表关联键
return value.split(",")[0];
}
private String queryFromExternalStorage(String key) {
// 模拟查询 Redis 或数据库
return "mock_value_for_" + key;
}
private String enrichData(String input, String dimValue) {
// 组合维度数据
return input + "," + dimValue;
}
}
3. 应用运行效果
(1) 维表查询逻辑优化
- 缓存命中时:直接返回缓存数据,访问延迟为纳秒级。
- 缓存未命中时:查询外部存储,并将结果写入缓存,后续重复访问相同的 Key 时不再查询外部存储。
(2) 减少存储压力
Guava Cache 本地缓存避免了大量高频查询直接命中外部存储,降低了 Redis、MySQL 等服务的负载。
Guava Cache 配置优化
-
缓存淘汰策略:
expireAfterWrite
:基于写入时间自动过期。expireAfterAccess
:基于访问时间自动过期。maximumSize
:限制最大缓存数量,避免内存占用过高。
-
异步加载机制: 如果需要异步加载数据,可以使用
CacheLoader
,在缓存未命中时自动加载:Cache<String, String> cache = CacheBuilder.newBuilder() .maximumSize(10000) .build(new CacheLoader<String, String>() { @Override public String load(String key) throws Exception { return queryFromExternalStorage(key); } });
-
监控与统计: 使用
Cache.stats()
查看缓存命中率等统计数据,便于优化缓存策略。
总结
通过在 Flink 中引入 Guava Cache,可以显著降低热存储维表的访问压力,提升系统性能。
这种方案适用于维表数据更新频率较低,且查询热点相对集中的场景