当前位置: 首页 > article >正文

redis中使用pipeline

在操作数据库时,为了加快程序的执行速度,在新增或更新数据时,可以通过批量提交的方式来减少应用和数据库间的传输次数;在redis中也有这样的技术实现批量处理,也就是管道——Pipeline。它也是通过批量提交数据的方式来实现的,将要执行的redis命令提交到pipeline中,pipeline一次性的将数据发送给服务器,服务器再逐条执行命令。在执行命令过程中不是原子性的,可以插入其他命令执行。
下面演示在jedis中使用管道:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.3.0</version>
</dependency>

先通过一个测试示例代码看一下运行时间差异:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;

import java.time.Duration;

public class JedisUtil {

    /**
     * 连接地址
     */
    private String host;
    /**
     * 连接端口号
     */
    private int port;
    /**
     * 密码
     */
    private String password;
    /**
     * 连接池
     */
    private JedisPool jedisPool;

    /**
     * 连接初始化
     * @param host
     * @param port
     * @param password
     */
    public JedisUtil(String host, int port, String password) {
        this.host = host;
        this.port = port;
        this.password = password;

        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(256);
        config.setMaxIdle(256);
        config.setMinIdle(1);
        config.setMaxWait(Duration.ofMillis(300));

        if(password != null && !"".equals(password)) {
            jedisPool = new JedisPool(config, host, port, 500, password);
        } else {
            jedisPool = new JedisPool(config, this.host, this.port, 500);
        }
    }

    /**
     * 关闭连接池
     */
    public void close() {
        if(jedisPool != null && !jedisPool.isClosed()) {
            jedisPool.clear();
            jedisPool.close();
        }
    }

    /**
     * 获取连接
     * @return
     */
    public Jedis getJedis() {
        if(jedisPool != null && !jedisPool.isClosed()) {
            return jedisPool.getResource();
        }
        return null;
    }

    /**
     * 归还jedis对象
     * @param jedis
     */
    public void returnJedis(Jedis jedis) {
        if(jedis != null) {
            jedis.close();
        }
    }

    public static void main(String[] args) {
        // 获取jedis连接
        JedisUtil util = new JedisUtil("192.168.56.101", 6379, "");
        Jedis jedis = util.getJedis();

        // 设置键的数量:100万
        int KEY_COUNT = 1_000_000;

        // 普通方式set
        long start1 = System.currentTimeMillis();
        for(int i = 0; i < KEY_COUNT; i++) {
            jedis.set("key1_" + i, "value1_" + i);
        }
        System.out.println("use time : " + (System.currentTimeMillis() - start1) + "ms");

        // 清理数据库的key,线上系统不要使用
        jedis.flushDB();
        
        // 使用管道set
        long start2 = System.currentTimeMillis();
        Pipeline pipeline = jedis.pipelined();
        int num = 0;
        for(int i = 0; i < KEY_COUNT; i++) {
            pipeline.set("key2_" + i, "value2_" + i);
            if(num++ >= 200) {
//                pipeline.syncAndReturnAll();
                pipeline.sync();
                pipeline.close();

                pipeline = jedis.pipelined();
                num = 0;
            }
        }
        if(num != 0) {
            pipeline.syncAndReturnAll();
            pipeline.close();
        }
        System.out.println("pipeline : " + (System.currentTimeMillis() - start2) + "ms");

        // 清理数据库的key,线上系统不要使用
        jedis.flushDB();
    }
}

上面的代码运行两次,调整先后顺序分别运行,得到的运行时间:

use time : 79297ms
pipeline : 2036ms

pipeline : 1747ms
use time : 85078ms

可以看到两次运行的时间差异还是非常明显的,基本上差距40~50倍,再实际运行时可以多次测试并调整每次pipeline提交命令的条数,找到每次提交数据时性能最好的数据条数。pipeline每次提交数据量不宜过多,太多的命令一次提交会导致客户端等待结果时间比较长,也会让连接的缓冲区数据量过大。

pipeline本身没有过多内容需要讲解,下面介绍一下如何在redisTemplate中使用pipeline,redisTemplate中已经提供了对应方法executePipelined()可以直接调用,它支持两个类型的参数:RedisCallback更接近redis原生命令,但是需要自己将键和值都转换为字节码传递过去;SessionCallback对操作进行了封装,可以根据操作不同的数据类型进行转换,方便api使用。

List<Object> datas = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        connection.set("key1".getBytes(StandardCharsets.UTF_8), "value1".getBytes(StandardCharsets.UTF_8));
        connection.set("key2".getBytes(StandardCharsets.UTF_8), "value2".getBytes(StandardCharsets.UTF_8));
        connection.set("key3".getBytes(StandardCharsets.UTF_8), "value3".getBytes(StandardCharsets.UTF_8));
        connection.set("key4".getBytes(StandardCharsets.UTF_8), "value4".getBytes(StandardCharsets.UTF_8));
        connection.set("key5".getBytes(StandardCharsets.UTF_8), "value5".getBytes(StandardCharsets.UTF_8));
        connection.set("key6".getBytes(StandardCharsets.UTF_8), "value6".getBytes(StandardCharsets.UTF_8));
        connection.get("key1".getBytes(StandardCharsets.UTF_8));

        // 这里必须返回null,在 connection.closePipeline() 时覆盖原来的返回值,所以返回值没有必要设置,设置会报错
        return null;
    }
});
List<Object> datas = redisTemplate.executePipelined(new SessionCallback<Object>() {
    @Override
    public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
        ValueOperations<String, String> op1 = (ValueOperations<String, String>) operations.opsForValue();
        op1.set("key7", "value7");
        op1.set("key8", "value8");
        op1.get("key2");

        SetOperations<String, String> op2 = (SetOperations<String, String>) operations.opsForSet();
        op2.add("set_demo", "value1", "value2", "value3");
        op2.randomMember("set_demo");

        return null;
    }
});

pipeline非常显著的提升系统性能,对于redis这种内存数据库,每天的请求量会非常高,对于系统优化来说,管道技术的使用应该成为代码的一个优化点。


http://www.kler.cn/a/160104.html

相关文章:

  • 一文了解Android中的AudioFlinger
  • 3588 yolov8 onnx 量化转 rknn 并运行
  • 「QT」窗口类 之 QWidget 窗口基类
  • 【深度学习】学习率介绍(torch.optim.lr_scheduler学习率调度策略介绍)
  • ABAP关于PS模块CJ20N中项目物料的屏幕和字段增强CI_RSADD
  • STM32中,不进行printf改写通过函数达到同款效果
  • Qt Rsa 加解密方法使用(pkcs1, pkcs8, 以及文件存储和内存存储密钥)
  • 对于多台232modbus仪表低成本通讯的modbus转profinet网关
  • 微服务开发:断路器详解
  • 卡码网语言基础课 | 20. 排队取奶茶
  • Vue的methods中定时器的变量报错问题
  • 十年JK无人知!一朝泳衣天下识
  • 【数据结构】——二叉树特点
  • 区块链创新应用场景不断拓展,实现去中心化
  • 前端三大MV*模式:MVC、mvvm、mvp模式介绍
  • 数据库的设计规范
  • Element-UI 动态控制输入组件类型,定义代码组件、前端模板
  • 02数仓平台Zookeeper
  • prime靶机打靶记录
  • 数字化转型:利用软件电商平台与私有化软件提升竞争力
  • C++ 共享内存ShellCode跨进程传输
  • 54.多级缓存
  • 【PyTorch】数据集
  • 实战oj题——设计循环队列
  • 【Qt之QSqlRelationalTableModel】描述及使用
  • 【微信小程序】保存多张图片到本地相册 wx.saveImageToPhotosAlbum