redis中使用pipeline
在操作数据库时,为了加快程序的执行速度,在新增或更新数据时,可以通过批量提交的方式来减少应用和数据库间的传输次数;在redis中也有这样的技术实现批量处理,也就是管道——Pipeline。它也是通过批量提交数据的方式来实现的,将要执行的redis命令提交到pipeline中,pipeline一次性的将数据发送给服务器,服务器再逐条执行命令。在执行命令过程中不是原子性的,可以插入其他命令执行。
下面演示在jedis中使用管道:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.0</version>
</dependency>
先通过一个测试示例代码看一下运行时间差异:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;
import java.time.Duration;
public class JedisUtil {
/**
* 连接地址
*/
private String host;
/**
* 连接端口号
*/
private int port;
/**
* 密码
*/
private String password;
/**
* 连接池
*/
private JedisPool jedisPool;
/**
* 连接初始化
* @param host
* @param port
* @param password
*/
public JedisUtil(String host, int port, String password) {
this.host = host;
this.port = port;
this.password = password;
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(256);
config.setMaxIdle(256);
config.setMinIdle(1);
config.setMaxWait(Duration.ofMillis(300));
if(password != null && !"".equals(password)) {
jedisPool = new JedisPool(config, host, port, 500, password);
} else {
jedisPool = new JedisPool(config, this.host, this.port, 500);
}
}
/**
* 关闭连接池
*/
public void close() {
if(jedisPool != null && !jedisPool.isClosed()) {
jedisPool.clear();
jedisPool.close();
}
}
/**
* 获取连接
* @return
*/
public Jedis getJedis() {
if(jedisPool != null && !jedisPool.isClosed()) {
return jedisPool.getResource();
}
return null;
}
/**
* 归还jedis对象
* @param jedis
*/
public void returnJedis(Jedis jedis) {
if(jedis != null) {
jedis.close();
}
}
public static void main(String[] args) {
// 获取jedis连接
JedisUtil util = new JedisUtil("192.168.56.101", 6379, "");
Jedis jedis = util.getJedis();
// 设置键的数量:100万
int KEY_COUNT = 1_000_000;
// 普通方式set
long start1 = System.currentTimeMillis();
for(int i = 0; i < KEY_COUNT; i++) {
jedis.set("key1_" + i, "value1_" + i);
}
System.out.println("use time : " + (System.currentTimeMillis() - start1) + "ms");
// 清理数据库的key,线上系统不要使用
jedis.flushDB();
// 使用管道set
long start2 = System.currentTimeMillis();
Pipeline pipeline = jedis.pipelined();
int num = 0;
for(int i = 0; i < KEY_COUNT; i++) {
pipeline.set("key2_" + i, "value2_" + i);
if(num++ >= 200) {
// pipeline.syncAndReturnAll();
pipeline.sync();
pipeline.close();
pipeline = jedis.pipelined();
num = 0;
}
}
if(num != 0) {
pipeline.syncAndReturnAll();
pipeline.close();
}
System.out.println("pipeline : " + (System.currentTimeMillis() - start2) + "ms");
// 清理数据库的key,线上系统不要使用
jedis.flushDB();
}
}
上面的代码运行两次,调整先后顺序分别运行,得到的运行时间:
use time : 79297ms
pipeline : 2036ms
pipeline : 1747ms
use time : 85078ms
可以看到两次运行的时间差异还是非常明显的,基本上差距40~50倍,再实际运行时可以多次测试并调整每次pipeline提交命令的条数,找到每次提交数据时性能最好的数据条数。pipeline每次提交数据量不宜过多,太多的命令一次提交会导致客户端等待结果时间比较长,也会让连接的缓冲区数据量过大。
pipeline本身没有过多内容需要讲解,下面介绍一下如何在redisTemplate中使用pipeline,redisTemplate中已经提供了对应方法executePipelined()可以直接调用,它支持两个类型的参数:RedisCallback更接近redis原生命令,但是需要自己将键和值都转换为字节码传递过去;SessionCallback对操作进行了封装,可以根据操作不同的数据类型进行转换,方便api使用。
List<Object> datas = redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.set("key1".getBytes(StandardCharsets.UTF_8), "value1".getBytes(StandardCharsets.UTF_8));
connection.set("key2".getBytes(StandardCharsets.UTF_8), "value2".getBytes(StandardCharsets.UTF_8));
connection.set("key3".getBytes(StandardCharsets.UTF_8), "value3".getBytes(StandardCharsets.UTF_8));
connection.set("key4".getBytes(StandardCharsets.UTF_8), "value4".getBytes(StandardCharsets.UTF_8));
connection.set("key5".getBytes(StandardCharsets.UTF_8), "value5".getBytes(StandardCharsets.UTF_8));
connection.set("key6".getBytes(StandardCharsets.UTF_8), "value6".getBytes(StandardCharsets.UTF_8));
connection.get("key1".getBytes(StandardCharsets.UTF_8));
// 这里必须返回null,在 connection.closePipeline() 时覆盖原来的返回值,所以返回值没有必要设置,设置会报错
return null;
}
});
List<Object> datas = redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
ValueOperations<String, String> op1 = (ValueOperations<String, String>) operations.opsForValue();
op1.set("key7", "value7");
op1.set("key8", "value8");
op1.get("key2");
SetOperations<String, String> op2 = (SetOperations<String, String>) operations.opsForSet();
op2.add("set_demo", "value1", "value2", "value3");
op2.randomMember("set_demo");
return null;
}
});
pipeline非常显著的提升系统性能,对于redis这种内存数据库,每天的请求量会非常高,对于系统优化来说,管道技术的使用应该成为代码的一个优化点。