Redis :redis的大Key问题
问题 : 什么是redis 的大key 呢?
redis 是一个单线程应用程序。他的请求类似于队列处理,命令排队执行,逐个处理。
这样就会出现一个问题,一旦队列前面的命令请求处理时间过程,那么后续执行命令就会被迫的等待。
请求处理过程中,如果涉及到的键或键关联的值数据量很大,Redis 针对这个请求的 I/O 操作耗时以及整体处理时间都将显著增加。
这种情况一旦发生,首先redis得吞吐量会下降,其他的命令排队等待时间也会增加。
问题:那么什么样的key 属于redis 的大key 呢?
string 类型的话,值不能大于5MB
zset 中 ,如果成员超过1万个
HASH 类型中,如果他的值总和超过100MB 我们也认为是一个大KEY了
问题:出现大key 问题如何解决?
第一种解决办法:
首先我们想到的就是对数据进行压缩,比如string类型 我们通常使用json ,我们可以对其进行压缩,比如使用Protocol Buffers 进行序列号存储,可以减少百分之50的空间使用。
第二种解决办法:
对大KEY进行拆分,拆成多个不同的字符串 ,存入不同的redis ,再取出数据进行拼接。
写入redis
// MetaInfo 定义数据元信息
type MetaInfo struct {
Data []byte `json:"data"` // 如果不是大Key,直接取这个字段,避免多次访问Redis
IsBigKey bool `json:"is_big_key"` // 标记是否是大Key,大Key需要按块存储
Keys []string `json:"keys"` // 子Key数组,用于存储分块数据的Key
}
// storeValueInRedis 将Value按字节大小拆分后存入Redis
func storeValueInRedis(ctx context.Context, key string, value []byte, chunkSize int) error {
// 计算需要多少个块(chunk)
totalChunks := (len(value) + chunkSize - 1) / chunkSize // 向上取整计算chunk数量
// 默认情况下存储为小Key
meta := MetaInfo{IsBigKey: false, Data: value}
// 如果需要分块存储(大Key)
if totalChunks > 1 {
// 生成数据版本号(MD5的后6位)
version := md5LastSixBytes(value)
keys := make([]string, 0, totalChunks)
// 创建Redis Pipeline,用于批量操作
pipe := redisClient.Pipeline()
// 按块存储数据
for i := 0; i < totalChunks; i++ {
// 计算当前块的开始和结束索引
start := i * chunkSize
end := (i + 1) * chunkSize
if end > len(value) {
end = len(value)
}
// 提取当前块数据
chunk := value[start:end]
// 构造当前块的Key,格式:主Key:版本号:块编号
chunkKey := fmt.Sprintf("%s:%s:%d", key, version, i)
keys = append(keys, chunkKey)
// 将当前块存入Pipeline
pipe.Set(ctx, chunkKey, chunk, 0)
}
// 执行Pipeline中的所有命令
if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("failed to store chunks in Redis: %w", err)
}
// 更新Meta信息为大Key
meta = MetaInfo{IsBigKey: true, Keys: keys, Data: nil}
}
// 将Meta信息序列化为JSON
metaByte, err := json.Marshal(meta)
if err != nil {
return fmt.Errorf("failed to marshal meta info: %w", err)
}
// 获取原来的Meta信息(用于设置过期时间)
oldMetaByte, err := redisClient.Get(ctx, key).Bytes()
if err != nil && err != redis.Nil { // 如果Key不存在,则忽略错误
return fmt.Errorf("failed to retrieve old meta info: %w", err)
}
// 将新Meta信息存入Redis
if _, err := redisClient.Set(ctx, key, metaByte, 0).Result(); err != nil {
return fmt.Errorf("failed to store new meta info: %w", err)
}
// 如果存在旧的Meta信息且是大Key
var oldMetaInfo MetaInfo
if err := json.Unmarshal(oldMetaByte, &oldMetaInfo); err == nil && oldMetaInfo.IsBigKey {
// TODO: 设置旧Key的过期时间(例如10分钟),避免影响正在使用的服务
// 可以遍历 oldMetaInfo.Keys 并设置每个子Key的过期时间
for _, oldKey := range oldMetaInfo.Keys {
// 设置过期时间为10分钟
if err := redisClient.Expire(ctx, oldKey, 10*time.Minute).Err(); err != nil {
return fmt.Errorf("failed to set expiration for old chunk key %s: %w", oldKey, err)
}
}
}
return nil
}
读取redis
// 从Redis获取数据
func getDataFromRedis(ctx context.Context, key string) ([]byte, error) {
// Step 1: 获取数据元信息
metaByte, err := redisClient.Get(ctx, key).Bytes()
if err != nil {
// 如果Redis中不存在该Key,返回错误
return nil, fmt.Errorf("failed to get metadata for key %s: %w", key, err)
}
// Step 2: 解析元信息为MetaInfo结构
var metaInfo MetaInfo
if err := json.Unmarshal(metaByte, &metaInfo); err != nil {
return nil, fmt.Errorf("failed to unmarshal metadata for key %s: %w", key, err)
}
// Step 3: 判断是否为大Key
if !metaInfo.IsBigKey {
// 如果不是大Key,直接返回Data字段中的数据
return metaInfo.Data, nil
}
// Step 4: 如果是大Key,使用Pipeline从多个子Key中获取数据
pipe := redisClient.Pipeline()
// 将每个子Key的Get操作添加到Pipeline中
for _, chunkKey := range metaInfo.Keys {
pipe.Get(ctx, chunkKey)
}
// 执行Pipeline中的所有命令
cmds, err := pipe.Exec(ctx)
if err != nil {
// 如果Pipeline执行失败,返回错误
return nil, fmt.Errorf("failed to execute pipeline for key %s: %w", key, err)
}
// Step 5: 拼接所有子Key的数据,生成完整的值
var data []byte
for i, cmd := range cmds {
// 检查每个命令的执行结果是否有错误
if cmd.Err() != nil {
return nil, fmt.Errorf("failed to get chunk %d for key %s: %w", i, key, cmd.Err())
}
// 获取子Key数据并追加到完整数据中
chunkData := []byte(cmd.(*redis.StringCmd).Val())
data = append(data, chunkData...)
}
// 返回拼接后的完整数据
return data, nil
}