模拟实战-用CompletableFuture优化远程RPC调用
实战场景
这是广州某500-900人互联网厂的面试原题
手写并发优化解决思路
我们要调用对方的RPC接口,我们的RPC接口每调用一次对方都会阻塞50ms
但是我们的业务要批量调用RPC,例如我们要批量调用1k次,我们不可能在for循环里面写1k次远程调用,因为我们1次就会阻塞50ms,我们for循环弄1k次那么就要等待1k×50ms
我们还要保证返回的结果是按照我们的请求顺序的
场景介绍:我们这边是C端的,我们不可能修改对方的代码,所以我们只能尽可能优化我们自己的代码提高接口效率
解决思路
1.通过Hash算法来分批运算,最后把结果存到map<Integer,String>里面然后来取,因为我们的顺序由id从低到高,所以我们可以通过id在map里面根据顺序取出然后放到我们的List里面
2.我们for循环,然后每一次循环都开启一个异步线程将结果存到Map里面,然后我们最终存到List。但我一开始有个问题,就是我没等全部执行完就存到我们的Map里面了,因为我不会写那个全局等待的代码......破防了
我最终的解决思路是2
package com.kira.scaffoldmvc.appender;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class RpcBatchRequestTest {
static RpcService rpcService = new RpcService();
public static void main(String[] args) throws ExecutionException, InterruptedException {
// rpc 请求参数
List<Integer> requestIds = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
// rpc 调用
List<String> results = batchGetDetails(requestIds);
// 输出
for (String result : results) {
System.out.println(result);
}
// 预期输出
// details 0
// details 1
// details 2
// .......
// details 999
}
/**
* 某个 rpc service 的接口只提供单个调用
* 此处需要做一个封装,多次请求后返回
*
* 要求按照顺序返回
*
* @param ids
* @return
*/
public static List<String> batchGetDetails(List<Integer> ids) throws ExecutionException, InterruptedException {
// 单次调用
// RpcService rpcService = new RpcService();
// String rpcResult = rpcService.rpcGetDetailsById(1);
List<String> list=new ArrayList<>();
HashMap<Integer,String> map=new HashMap<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
//for循环里面的每一个都开启一个for
for(int i=0;i<ids.size();i++)
{
int finalI = i;
CompletableFuture future=CompletableFuture.supplyAsync(() -> {
String s = rpcService.rpcGetDetailsById(ids.get(finalI));
map.put(finalI, s);
return s;
});
futures.add(future);
}
//futures.toArray(new CompletableFuture[0])) 将future数组转成CompletableFuture数组
//如果你传入 new CompletableFuture[0],Java 会动态调整数组大小,以适应 futures 中的元素数
//addOf()等待所有Completable异步线程都执行完
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// TODO 在此处实现批量调用
for(int i=0;i<ids.size();i++)
{
list.add(map.get(i));
}
return list;
}
}
class RpcService {
public String rpcGetDetailsById(int id) {
// 模拟 rpc service 耗时
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "details " + id;
}
}
分批推送的解决思路
每批为500份
package com.kira.scaffoldmvc.appender;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class RpcBatchRequestTest2 {
static RpcService rpcService = new RpcService();
public static void main(String[] args) throws ExecutionException, InterruptedException {
// rpc 请求参数
List<Integer> requestIds = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
// rpc 调用
List<String> results = batchGetDetails(requestIds);
// 输出
for (String result : results) {
System.out.println(result);
}
}
/**
* 按批次异步调用 RPC 接口,并确保按顺序返回
*
* @param ids 请求 ID 列表
* @return 按顺序返回的结果列表
*/
public static List<String> batchGetDetails(List<Integer> ids) throws ExecutionException, InterruptedException {
int batchSize = 500; // 每批大小
List<CompletableFuture<List<String>>> batchFutures = new ArrayList<>();
// 按批次切分数据
for (int i = 0; i < ids.size(); i += batchSize) {
int start = i;
int end = Math.min(i + batchSize, ids.size());
List<Integer> batch = ids.subList(start, end);
// 异步处理每个批次
CompletableFuture<List<String>> batchFuture = CompletableFuture.supplyAsync(() ->
batch.stream()
.map(rpcService::rpcGetDetailsById) // 调用 RPC 方法
.collect(Collectors.toList())
);
batchFutures.add(batchFuture);
}
// 等待所有批次完成并收集结果
List<String> results = new ArrayList<>();
CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).join();
for (CompletableFuture<List<String>> future : batchFutures) {
results.addAll(future.get());
}
return results;
}
}
class RpcService2 {
public String rpcGetDetailsById(int id) {
// 模拟 rpc service 耗时
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "details " + id;
}
}