当前位置: 首页 > article >正文

模拟实战-用CompletableFuture优化远程RPC调用

实战场景

 这是广州某500-900人互联网厂的面试原题

手写并发优化解决思路

我们要调用对方的RPC接口,我们的RPC接口每调用一次对方都会阻塞50ms

但是我们的业务要批量调用RPC,例如我们要批量调用1k次,我们不可能在for循环里面写1k次远程调用,因为我们1次就会阻塞50ms,我们for循环弄1k次那么就要等待1k×50ms

我们还要保证返回的结果是按照我们的请求顺序的

场景介绍:我们这边是C端的,我们不可能修改对方的代码,所以我们只能尽可能优化我们自己的代码提高接口效率


解决思路

1.通过Hash算法来分批运算,最后把结果存到map<Integer,String>里面然后来取,因为我们的顺序由id从低到高,所以我们可以通过id在map里面根据顺序取出然后放到我们的List里面

2.我们for循环,然后每一次循环都开启一个异步线程将结果存到Map里面,然后我们最终存到List。但我一开始有个问题,就是我没等全部执行完就存到我们的Map里面了,因为我不会写那个全局等待的代码......破防了

我最终的解决思路是2

package com.kira.scaffoldmvc.appender;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class RpcBatchRequestTest {

    static RpcService rpcService = new RpcService();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // rpc 请求参数
        List<Integer> requestIds = IntStream.range(0, 1000).boxed().collect(Collectors.toList());

        // rpc 调用
        List<String> results = batchGetDetails(requestIds);

        // 输出
        for (String result : results) {
            System.out.println(result);
        }
        // 预期输出
        // details 0
        // details 1
        // details 2
        // .......
        // details 999
    }

    /**
     * 某个 rpc service 的接口只提供单个调用
     * 此处需要做一个封装,多次请求后返回
     *
     * 要求按照顺序返回
     *
     * @param ids
     * @return
     */

    public static List<String> batchGetDetails(List<Integer> ids) throws ExecutionException, InterruptedException {
//         单次调用
//         RpcService rpcService = new RpcService();
//         String rpcResult = rpcService.rpcGetDetailsById(1);


        List<String> list=new ArrayList<>();

        HashMap<Integer,String> map=new HashMap<>();
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //for循环里面的每一个都开启一个for
        for(int i=0;i<ids.size();i++)
        {

            int finalI = i;
            CompletableFuture future=CompletableFuture.supplyAsync(() -> {

                            String s = rpcService.rpcGetDetailsById(ids.get(finalI));
                            map.put(finalI, s);

                return s;
            });
futures.add(future);


        }

        //futures.toArray(new CompletableFuture[0]))      将future数组转成CompletableFuture数组
        //如果你传入 new CompletableFuture[0],Java 会动态调整数组大小,以适应 futures 中的元素数
        //addOf()等待所有Completable异步线程都执行完
        
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

        // TODO 在此处实现批量调用
        for(int i=0;i<ids.size();i++)
        {
            list.add(map.get(i));
        }

        return list;


    }
}

class RpcService {
    public String rpcGetDetailsById(int id) {
        // 模拟 rpc service 耗时
        try {
            Thread.sleep(50L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "details " + id;
    }
}

分批推送的解决思路

每批为500份

package com.kira.scaffoldmvc.appender;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class RpcBatchRequestTest2 {

    static RpcService rpcService = new RpcService();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // rpc 请求参数
        List<Integer> requestIds = IntStream.range(0, 1000).boxed().collect(Collectors.toList());

        // rpc 调用
        List<String> results = batchGetDetails(requestIds);

        // 输出
        for (String result : results) {
            System.out.println(result);
        }
    }

    /**
     * 按批次异步调用 RPC 接口,并确保按顺序返回
     *
     * @param ids 请求 ID 列表
     * @return 按顺序返回的结果列表
     */
    public static List<String> batchGetDetails(List<Integer> ids) throws ExecutionException, InterruptedException {
        int batchSize = 500; // 每批大小
        List<CompletableFuture<List<String>>> batchFutures = new ArrayList<>();

        // 按批次切分数据
        for (int i = 0; i < ids.size(); i += batchSize) {
            int start = i;
            int end = Math.min(i + batchSize, ids.size());
            List<Integer> batch = ids.subList(start, end);

            // 异步处理每个批次
            CompletableFuture<List<String>> batchFuture = CompletableFuture.supplyAsync(() -> 
                batch.stream()
                     .map(rpcService::rpcGetDetailsById) // 调用 RPC 方法
                     .collect(Collectors.toList())
            );

            batchFutures.add(batchFuture);
        }

        // 等待所有批次完成并收集结果
        List<String> results = new ArrayList<>();
        CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).join();

        for (CompletableFuture<List<String>> future : batchFutures) {
            results.addAll(future.get());
        }

        return results;
    }
}

class RpcService2 {
    public String rpcGetDetailsById(int id) {
        // 模拟 rpc service 耗时
        try {
            Thread.sleep(50L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "details " + id;
    }
}

http://www.kler.cn/a/531053.html

相关文章:

  • React+Cesium基础教程(003):加载3D建筑物和创建标签
  • C语言教学第四课:控制结构
  • 深入理解linux中的文件(上)
  • 41. 缺失的第一个正数
  • C#面试常考随笔9:什么是闭包?
  • 《数据可视化新高度:Graphy的AI协作变革》
  • 【优先算法】专题——位运算
  • 存储器知识点2
  • 基础IO的学习
  • 代码随想录-训练营-day18
  • 【go语言】grpc 快速入门
  • 30分钟入门CompletableFuture并发工具使用
  • 【漫话机器学习系列】077.范数惩罚是如何起作用的(How Norm Penalties Work)
  • mac执行brew services list时,无法连接GitHub
  • 谷歌Titans模型论文解析,Transformer迎来变革拐点——DeepSeek能否“接招”?
  • 七. Redis 当中 Jedis 的详细刨析与使用
  • 【自开发工具介绍】SQLSERVER的ImpDp和ExpDp工具03
  • 【建站】专栏目录
  • 51c视觉~CV~合集10
  • Windows图形界面(GUI)-QT-C/C++ - QT Stacked Widget
  • 运维自动化工具集:构建高效运维体系的密钥
  • 浏览器模块化难题
  • 解决vscode扩展插件开发webview中的请求跨域问题
  • 【前端】ES6模块化
  • 大模型综合性能考题汇总
  • 【PyQt】keyPressEvent键盘按压事件无响应