当前位置: 首页 > article >正文

Redis :redis的大Key问题

问题 : 什么是redis 的大key 呢?

 redis 是一个单线程应用程序。他的请求类似于队列处理,命令排队执行,逐个处理。

这样就会出现一个问题,一旦队列前面的命令请求处理时间过程,那么后续执行命令就会被迫的等待。

请求处理过程中,如果涉及到的键或键关联的值数据量很大,Redis 针对这个请求的 I/O 操作耗时以及整体处理时间都将显著增加。

这种情况一旦发生,首先redis得吞吐量会下降,其他的命令排队等待时间也会增加。

问题:那么什么样的key 属于redis 的大key 呢?

string 类型的话,值不能大于5MB

zset 中 ,如果成员超过1万个

HASH 类型中,如果他的值总和超过100MB 我们也认为是一个大KEY了

问题:出现大key 问题如何解决?

第一种解决办法:

首先我们想到的就是对数据进行压缩,比如string类型 我们通常使用json ,我们可以对其进行压缩,比如使用Protocol Buffers 进行序列号存储,可以减少百分之50的空间使用。

第二种解决办法:

对大KEY进行拆分,拆成多个不同的字符串 ,存入不同的redis ,再取出数据进行拼接。

 写入redis 

// MetaInfo 定义数据元信息
type MetaInfo struct {
	Data     []byte   `json:"data"`     // 如果不是大Key,直接取这个字段,避免多次访问Redis
	IsBigKey bool     `json:"is_big_key"` // 标记是否是大Key,大Key需要按块存储
	Keys     []string `json:"keys"`     // 子Key数组,用于存储分块数据的Key
}

// storeValueInRedis 将Value按字节大小拆分后存入Redis
func storeValueInRedis(ctx context.Context, key string, value []byte, chunkSize int) error {
	// 计算需要多少个块(chunk)
	totalChunks := (len(value) + chunkSize - 1) / chunkSize // 向上取整计算chunk数量

	// 默认情况下存储为小Key
	meta := MetaInfo{IsBigKey: false, Data: value}

	// 如果需要分块存储(大Key)
	if totalChunks > 1 {
		// 生成数据版本号(MD5的后6位)
		version := md5LastSixBytes(value)
		keys := make([]string, 0, totalChunks)

		// 创建Redis Pipeline,用于批量操作
		pipe := redisClient.Pipeline()

		// 按块存储数据
		for i := 0; i < totalChunks; i++ {
			// 计算当前块的开始和结束索引
			start := i * chunkSize
			end := (i + 1) * chunkSize
			if end > len(value) {
				end = len(value)
			}

			// 提取当前块数据
			chunk := value[start:end]

			// 构造当前块的Key,格式:主Key:版本号:块编号
			chunkKey := fmt.Sprintf("%s:%s:%d", key, version, i)
			keys = append(keys, chunkKey)

			// 将当前块存入Pipeline
			pipe.Set(ctx, chunkKey, chunk, 0)
		}

		// 执行Pipeline中的所有命令
		if _, err := pipe.Exec(ctx); err != nil {
			return fmt.Errorf("failed to store chunks in Redis: %w", err)
		}

		// 更新Meta信息为大Key
		meta = MetaInfo{IsBigKey: true, Keys: keys, Data: nil}
	}

	// 将Meta信息序列化为JSON
	metaByte, err := json.Marshal(meta)
	if err != nil {
		return fmt.Errorf("failed to marshal meta info: %w", err)
	}

	// 获取原来的Meta信息(用于设置过期时间)
	oldMetaByte, err := redisClient.Get(ctx, key).Bytes()
	if err != nil && err != redis.Nil { // 如果Key不存在,则忽略错误
		return fmt.Errorf("failed to retrieve old meta info: %w", err)
	}

	// 将新Meta信息存入Redis
	if _, err := redisClient.Set(ctx, key, metaByte, 0).Result(); err != nil {
		return fmt.Errorf("failed to store new meta info: %w", err)
	}

	// 如果存在旧的Meta信息且是大Key
	var oldMetaInfo MetaInfo
	if err := json.Unmarshal(oldMetaByte, &oldMetaInfo); err == nil && oldMetaInfo.IsBigKey {
		// TODO: 设置旧Key的过期时间(例如10分钟),避免影响正在使用的服务
		// 可以遍历 oldMetaInfo.Keys 并设置每个子Key的过期时间
		for _, oldKey := range oldMetaInfo.Keys {
			// 设置过期时间为10分钟
			if err := redisClient.Expire(ctx, oldKey, 10*time.Minute).Err(); err != nil {
				return fmt.Errorf("failed to set expiration for old chunk key %s: %w", oldKey, err)
			}
		}
	}

	return nil
}

读取redis 

// 从Redis获取数据
func getDataFromRedis(ctx context.Context, key string) ([]byte, error) {
	// Step 1: 获取数据元信息
	metaByte, err := redisClient.Get(ctx, key).Bytes()
	if err != nil {
		// 如果Redis中不存在该Key,返回错误
		return nil, fmt.Errorf("failed to get metadata for key %s: %w", key, err)
	}

	// Step 2: 解析元信息为MetaInfo结构
	var metaInfo MetaInfo
	if err := json.Unmarshal(metaByte, &metaInfo); err != nil {
		return nil, fmt.Errorf("failed to unmarshal metadata for key %s: %w", key, err)
	}

	// Step 3: 判断是否为大Key
	if !metaInfo.IsBigKey {
		// 如果不是大Key,直接返回Data字段中的数据
		return metaInfo.Data, nil
	}

	// Step 4: 如果是大Key,使用Pipeline从多个子Key中获取数据
	pipe := redisClient.Pipeline()

	// 将每个子Key的Get操作添加到Pipeline中
	for _, chunkKey := range metaInfo.Keys {
		pipe.Get(ctx, chunkKey)
	}

	// 执行Pipeline中的所有命令
	cmds, err := pipe.Exec(ctx)
	if err != nil {
		// 如果Pipeline执行失败,返回错误
		return nil, fmt.Errorf("failed to execute pipeline for key %s: %w", key, err)
	}

	// Step 5: 拼接所有子Key的数据,生成完整的值
	var data []byte
	for i, cmd := range cmds {
		// 检查每个命令的执行结果是否有错误
		if cmd.Err() != nil {
			return nil, fmt.Errorf("failed to get chunk %d for key %s: %w", i, key, cmd.Err())
		}

		// 获取子Key数据并追加到完整数据中
		chunkData := []byte(cmd.(*redis.StringCmd).Val())
		data = append(data, chunkData...)
	}

	// 返回拼接后的完整数据
	return data, nil
}

 


http://www.kler.cn/a/506538.html

相关文章:

  • JAVA实现五子棋小游戏(附源码)
  • 网络安全技术深度解析与实践案例
  • HunyuanVideo 文生视频模型实践
  • 网络安全 | 什么是威胁情报?
  • 【cs.CV】25.1.15 arxiv更新速递
  • 内联变量(inline variables):在多个文件中共享全局常量
  • 复盘思维课程
  • Spring MVC核心组件与请求处理流程
  • BertTokenizerFast 和 BertTokenizer 的区别
  • 探索 AI 自动化编程:效率革命与未来教育的转型
  • Java 对象池管理的高性能工具库 Apache Commons Pool 2
  • 2.两数相加--力扣
  • tomcat文件目录讲解
  • 剑指Offer|LCR 031. LRU 缓存
  • Haskell语言的网络编程
  • 基于 Electron 应用的安全测试基础 — 提取和分析 .asar 文件
  • 【k8s面试题2025】1、练气期
  • 鸿蒙-点击Notification通知并打开App的具体页面
  • 动态规划汇总1
  • 服务器数据恢复—Zfs文件系统数据恢复案例
  • mongDB学习笔记
  • 基于Linux系统指令使用详细解析
  • 浅谈云计算18 | OpenStack架构概述
  • 自动化仓储管理与库存控制
  • 《零基础Go语言算法实战》【题目 4-11】在不使用任何内置散列表库的情况下设计一个 HashSet
  • 蓝桥杯刷题第三天——排序