【redis】pipeline管道
Redis Pipeline(管道)是一种将多个命令批量发送到服务器的技术。与逐个发送命令的传统方式不同,Pipeline允许客户端一次性打包多条命令,通过单次网络往返完成批量操作。
为什么需要Pipeline?
在常规操作模式下,每个Redis命令的执行流程为:
客户端发送命令 -> 服务器处理 -> 客户端接收响应
当执行N条命令时会产生N次网络往返延迟(RTT)。
而Pipeline通过批量发送机制:
客户端打包命令1,2,3... -> 服务器顺序处理 -> 批量返回响应
将网络开销从O(N)降低到O(1),在批量操作场景下可带来数十倍的性能提升。
适用场景:
-
批量写入/读取数据
-
需要执行多个无关联命令
-
高并发且对实时性要求不高的操作
Pipeline命令的使用
命令行使用
使用redis-cli
的--pipe
参数实现Pipeline:
# 生成测试命令数据(生成5条SET命令)
$ printf "SET key%d value%d\n" $(seq 1 10) > commands.txt
# 查询生成的测试命令数据
$ cat commands.txt
SET key1 value2
SET key3 value4
SET key5 value6
SET key7 value8
SET key9 value10
# 通过Pipeline执行
$ cat commands.txt | redis-cli --pipe
All data transferred. Waiting for the last reply...
Last reply received from server.
errors: 0, replies: 5
Java代码实现
使用Jedis客户端库示例:
package com.morris.redis.demo.pipeline;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import java.util.List;
import java.util.concurrent.ExecutionException;
/**
* jedis使用pipeline
*/
public class JedisPipelineDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Jedis jedis = new Jedis("localhost", 6379);
Pipeline pipeline = jedis.pipelined();
// 批量写入
for (int i = 1; i <= 10000; i++) {
pipeline.set("key" + i, "value" + i);
}
// 提交并获取响应(注意响应顺序与命令顺序一致)
List<Object> responses = pipeline.syncAndReturnAll();
System.out.println("写入完成,响应数量:" + responses.size());
}
}
性能对比测试
测试代码:
package com.morris.redis.demo.pipeline;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
/**
* pipeline和单命令执行性能对比
*/
public class PerformanceTestDemo {
public static final int count = 1000;
public static void main(String[] args) {
// 普通模式
long start = System.currentTimeMillis();
try (Jedis jedis = new Jedis("localhost", 6379)) {
for (int i = 0; i < count; i++) {
jedis.set("normal_" + i, "value");
}
}
long normalCost = System.currentTimeMillis() - start;
// Pipeline模式
start = System.currentTimeMillis();
try (Jedis jedis = new Jedis("localhost", 6379)) {
Pipeline pipeline = jedis.pipelined();
for (int i = 0; i < count; i++) {
pipeline.set("pipe_" + i, "value");
}
pipeline.sync();
}
long pipeCost = System.currentTimeMillis() - start;
System.out.printf("操作次数: %d | 普通模式: %dms | Pipeline模式: %dms\n",
count, normalCost, pipeCost);
}
}
测试结果:
操作次数: 1000 | 普通模式: 2784ms | Pipeline模式: 2140ms
操作次数: 10000 | 普通模式: 8315ms | Pipeline模式: 185ms
操作次数: 100000 | 普通模式: 52180ms | Pipeline模式: 2479ms
注意事项
-
批量大小控制:建议单次Pipeline命令数控制在10,000以内
-
非原子性:Pipeline不是事务,中间命令失败不会影响后续执行
-
内存消耗:服务端会将所有响应缓存在内存中,超大Pipeline可能引发OOM
-
错误处理:通过解析响应列表检查每个命令的执行结果