当前位置: 首页 > article >正文

六、线程间的协作原理场景剖析

1. 如何使用同步工具CountDownLatch协调多线程?

解释:

        CountDownLatch计数器,为0之前会一直阻塞,等待其他线程完成。

几个线程等待另外几个线程减少计数器的值 - 代码示例:

package imooc7.cdl;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @Description:测试几个线程等待另外几个线程减少计数器的值
 */
public class TestCountDownLatch {
    public static void main(String[] args) {
        //传入一个int值给countDownLatch
        CountDownLatch countDownLatch = new CountDownLatch(2);
        new Thread(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("客户支付尾款");
            countDownLatch.countDown();
            System.out.println("客户支付后续操作");
        }).start();
        new Thread(()->{
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("组装家具结束");
            countDownLatch.countDown();
            System.out.println("组装家具后续操作");
        }).start();

        //发货线程
        new Thread(()->{
            try {
                //建议: 使用待超时时间的await方法
                countDownLatch.await(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("支付尾款和组装家具都已经完成,可以发货");
            System.out.println("发货执行");
        }).start();

        new Thread(()->{
            try {
                countDownLatch.await(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("支付尾款和组装家具都已经完成,可以发邮件通知客户");
            System.out.println("发邮件执行 ");
        }).start();
    }
}

发令枪模式 - 代码示例:

package imooc7.cdl;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @Description:测试发令枪模式
 */
public class TestCountDownLatch1 {
    public static void main(String[] args) {
        //发令枪模式传入的int值为1
        CountDownLatch countDownLatch = new CountDownLatch(1);
        //模拟田径运动
        for (int i = 0; i < 10; i++) {
            //10个运动员
            new Thread(()->{
                try {
                    countDownLatch.await(5, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("运动员起跑 "+Thread.currentThread().getName());
            }).start();
        }
        //裁判线程,开发令枪
        new Thread(()->{
            System.out.println("裁判准备开枪");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            countDownLatch.countDown();
        }).start();
    }
}

运行结果:

解释:

        1.使用join(),需要有线程对象调用join(),而是有那个线程池,无法获取当前线程对象;

        2.CountDownLatch更加通用,而Thread.join()是等待join()线程的完成;

        3.使用Thread.join()方式, 只能等待join()的线程结束,当前线程才能继续,而使用CountDownLatch,子线程可以继续执行操作流程;

2.如何使用同步工具CyclicBarrier协调多线程?

代码示例:

package imooc7.cb;

import java.util.Random;
import java.util.concurrent.*;

/**
 * @Description:模拟多人游戏流程
 */
public class TestCyclicBarrierOld {
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(4);

    public static void main(String[] args) {
        System.out.println(new Random().nextInt(3));
        for (int i = 0; i < 4; i++) {
            //模拟玩家线程
            int finalI = i;
            new Thread(()->{

                try {
                    //匹配玩家
                    matchPlayer(finalI);

                    //选择角色
                    selectCharacter(finalI);

                    loadGame(finalI);
                    //加载游戏资源
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (BrokenBarrierException e) {
                    throw new RuntimeException(e);
                } catch (TimeoutException e) {
                    throw new RuntimeException(e);
                }
            }).start();
        }
    }

    private static void loadGame(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
        System.out.println("player "+playNumber+" 加载游戏资源");
        Thread.sleep(1000+new Random().nextInt(3)*1000);
        System.out.println("player "+playNumber+" 完成加载游戏资源");
        cyclicBarrier.await(10, TimeUnit.SECONDS);
    }

    private static void selectCharacter(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
        System.out.println("player "+playNumber+" 选择角色");
        Thread.sleep(1000+new Random().nextInt(3)*1000);
        System.out.println("player "+playNumber+" 完成选择角色");
        cyclicBarrier.await(10, TimeUnit.SECONDS);
    }

    private static void matchPlayer(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
        System.out.println("player "+playNumber+" 开始匹配玩家");
        Thread.sleep(1000+new Random().nextInt(3)*1000);
        System.out.println("player "+playNumber+" 完成匹配玩家");
        cyclicBarrier.await(10, TimeUnit.SECONDS);
    }

    public enum Step{
        NONE,
        MATCH,
        SELECT,
        LOAD;
    }
}

运行结果:

package imooc7.cb;

import java.util.Random;
import java.util.concurrent.*;

/**
 * @Description:模拟多人游戏流程
 */
public class TestCyclicBarrierOld {

    //CyclicBarrier此处回调方法为同步方法
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(4, ()-> {
        try {
            //同步方法,会等待此处代码执行完成,所有线程才会继续执行下一步流程
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("游戏阶段到达");
    });

    public static void main(String[] args) {
        System.out.println(new Random().nextInt(3));
        for (int i = 0; i < 4; i++) {
            //模拟玩家线程
            int finalI = i;
            new Thread(()->{

                try {
                    //匹配玩家
                    matchPlayer(finalI);

                    //选择角色
                    selectCharacter(finalI);

                    loadGame(finalI);
                    //加载游戏资源
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (BrokenBarrierException e) {
                    throw new RuntimeException(e);
                } catch (TimeoutException e) {
                    throw new RuntimeException(e);
                }
            }).start();
        }
    }

    private static void loadGame(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
        System.out.println("player "+playNumber+" 加载游戏资源");
        Thread.sleep(1000+new Random().nextInt(3)*1000);
        System.out.println("player "+playNumber+" 完成加载游戏资源");
        cyclicBarrier.await(10, TimeUnit.SECONDS);
    }

    private static void selectCharacter(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
        System.out.println("player "+playNumber+" 选择角色");
        Thread.sleep(1000+new Random().nextInt(3)*1000);
        System.out.println("player "+playNumber+" 完成选择角色");
        cyclicBarrier.await(10, TimeUnit.SECONDS);
    }

    private static void matchPlayer(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
        System.out.println("player "+playNumber+" 开始匹配玩家");
        Thread.sleep(1000+new Random().nextInt(3)*1000);
        System.out.println("player "+playNumber+" 完成匹配玩家");
        cyclicBarrier.await(10, TimeUnit.SECONDS);
    }

    public enum Step{
        NONE,
        MATCH,
        SELECT,
        LOAD;
    }
}

运行结果:

解释:

        每一阶段最后一个线程执行完成后,会同步执行CyclicBarrier的回调方法,如果回调方法比较耗时,可以改成异步方法。

优化为异步执行:

package imooc7.cb;

import java.util.Random;
import java.util.concurrent.*;

/**
 * @Description:模拟多人游戏流程
 */
public class TestCyclicBarrierOld {
    
    static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(4, ()-> {
        //通过使用线程池将同步执行回调方法改为异步执行回调方法
        executorService.execute(()-> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("游戏阶段到达");
        });

    });

    public static void main(String[] args) {
        System.out.println(new Random().nextInt(3));
        for (int i = 0; i < 4; i++) {
            //模拟玩家线程
            int finalI = i;
            new Thread(()->{

                try {
                    //匹配玩家
                    matchPlayer(finalI);

                    //选择角色
                    selectCharacter(finalI);

                    loadGame(finalI);
                    //加载游戏资源
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (BrokenBarrierException e) {
                    throw new RuntimeException(e);
                } catch (TimeoutException e) {
                    throw new RuntimeException(e);
                }
            }).start();
        }
    }

    private static void loadGame(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
        System.out.println("player "+playNumber+" 加载游戏资源");
        Thread.sleep(1000+new Random().nextInt(3)*1000);
        System.out.println("player "+playNumber+" 完成加载游戏资源");
        cyclicBarrier.await(10, TimeUnit.SECONDS);
    }

    private static void selectCharacter(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
        System.out.println("player "+playNumber+" 选择角色");
        Thread.sleep(1000+new Random().nextInt(3)*1000);
        System.out.println("player "+playNumber+" 完成选择角色");
        cyclicBarrier.await(10, TimeUnit.SECONDS);
    }

    private static void matchPlayer(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
        System.out.println("player "+playNumber+" 开始匹配玩家");
        Thread.sleep(1000+new Random().nextInt(3)*1000);
        System.out.println("player "+playNumber+" 完成匹配玩家");
        cyclicBarrier.await(10, TimeUnit.SECONDS);
    }

    public enum Step{
        NONE,
        MATCH,
        SELECT,
        LOAD;
    }
}

运行结果:

优化代码 - 增加每个阶段结束语

package imooc7.cb;

import java.util.Random;
import java.util.concurrent.*;

/**
 * @Description:模拟多人游戏流程
 */
public class TestCyclicBarrierOld {

    static Step step;
    static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(4, ()-> {
//        executorService.execute(()-> {
            switch (step) {
                case MATCH -> System.out.println("游戏匹配结束");
                case SELECT -> System.out.println("游戏选择角色结束");
                case LOAD -> System.out.println("游戏加载结束");
            }
//        });

    });

    public static void main(String[] args) {
        System.out.println(new Random().nextInt(3));
        for (int i = 0; i < 4; i++) {
            //模拟玩家线程
            int finalI = i;
            new Thread(()->{

                try {
                    //匹配玩家
                    step = Step.MATCH;
                    matchPlayer(finalI);

                    //选择角色
                    step = Step.SELECT;
                    selectCharacter(finalI);

                    //加载游戏资源
                    step = Step.LOAD;
                    loadGame(finalI);

                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (BrokenBarrierException e) {
                    throw new RuntimeException(e);
                } catch (TimeoutException e) {
                    throw new RuntimeException(e);
                }
            }).start();
        }
    }

    private static void loadGame(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
        System.out.println("player "+playNumber+" 加载游戏资源");
        Thread.sleep(1000+new Random().nextInt(3)*1000);
        System.out.println("player "+playNumber+" 完成加载游戏资源");
        cyclicBarrier.await(10, TimeUnit.SECONDS);
    }

    private static void selectCharacter(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
        System.out.println("player "+playNumber+" 选择角色");
        Thread.sleep(1000+new Random().nextInt(3)*1000);
        System.out.println("player "+playNumber+" 完成选择角色");
        cyclicBarrier.await(10, TimeUnit.SECONDS);
    }

    private static void matchPlayer(int playNumber) throws InterruptedException, BrokenBarrierException, TimeoutException {
        System.out.println("player "+playNumber+" 开始匹配玩家");
        Thread.sleep(1000+new Random().nextInt(3)*1000);
        System.out.println("player "+playNumber+" 完成匹配玩家");
        cyclicBarrier.await(10, TimeUnit.SECONDS);
    }

    public enum Step{
        NONE,
        MATCH,
        SELECT,
        LOAD;
    }
}

运行结果:

3.异步编程(一)CompletableFuture介绍

解释:

1.Future只提供了异步线程的结果返回,并没有提供回调机制;

2.无法编排任务,处理有依赖关系的任务;

3.无法处理有竞争的任务,比如实现最快的任务结束直接返回。

Future的不足:

package imooc7.cf;

import java.util.concurrent.*;

/**
 * @Description:Future的不足
 */
public class FutureLimit {
    //生成订单的三个步骤
    //1 查询商品信息
    //2 查询用户信息
    //3 生成订单
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
        //查询商品信息
        Future<String> productFuture = executorService.submit(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "iphone";
        });
        //查询用户信息
        Future<String> userFuture = executorService.submit(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "小明";
        });

        //生成订单
        Future<String> orderFuture = executorService.submit(() -> {
            try {
                String productName = productFuture.get(3, TimeUnit.SECONDS);
                String userName = userFuture.get(3, TimeUnit.SECONDS);
                return productName + " : " + userName;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }
        });
        try {
            System.out.println(orderFuture.get(3, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }
}

运行结果:

结论:

        虽然实现了上面看了的功能,但是还是存在不足的,

        1)查询商品信息、查询用户信息结束后,通过回调去生成商品订单,是无法支持的;

        2)无法支持编排任务 - 无法在语义上实现前两个任务执行结束,第三个任务开始执行;

        3)刚才3个线程,代码耦合度非常高。

4.异步编程(二)开启异步任务

建议:在实际业务中,runAsync使用传入线程池的方法。

建议:在实际业务中,supplyAsync使用传入线程池的方法。

runAsync示例:

package imooc7.cf;

import java.util.concurrent.*;

/**
 * @Description:开启异步任务
 */
public class CompletableFutureTest1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //runAsync方法
        ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("异步任务开始 " + Thread.currentThread().getName());
        },executorService);
        future.get();
        //异步任务开始 ForkJoinPool.commonPool-worker-1
    }
}

supplyAsync示例:

package imooc7.cf;

import java.util.concurrent.*;

/**
 * @Description:开启异步任务 supplyAsync
 */
public class CompletableFutureTest2 {
    public static void main(String[] args)  {
        ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return "iphone";
        });
        //获取返回值
//        String name = future.get();
        //使用join()不用手动处理编译异常,只会有RunntimeException
        //使用get()需要手动处理编译异常
        future.join();


    }
}

注意:

        1)future.get()和 future.join()都可以获得异步任务的返回值;

        2)使用join()不用手动处理编译异常,只会有RunntimeException,使用get()需要手动处理编译异常,推荐使用join()更加优雅。

5.异步编程(三)异步任务回调

thenRun代码示例:、

package imooc7.cf;

import java.util.concurrent.*;

/**
 * @Description:测试回调
 */
public class TestCallBack {
    public static void main(String[] args) {
        //1 测试thenRun
        ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
        CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName());
        }).thenRun(() -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("thenRun回调函数执行");
        });
        future.join();
    }
}

解释:

        thenRun回调函数所在线程跟runAsync所在线程为同一线程。

运行结果:

thenRunAsync示例:

package imooc7.cf;

import java.util.concurrent.*;

/**
 * @Description:测试回调
 */
public class TestCallBack {
    public static void main(String[] args) {
        //1 测试thenRun
        ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
        CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName());
        }).thenRunAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("thenRunAsync回调函数执行");
        }, executorService);
        future.join();
    }
}

解释:

        thenRunAsync回调函数所在线程跟runAsync所在线程为不是同一个线程,异步执行。

运行结果:

thenAccept同步回调代码示例:
package imooc7.cf;

import java.util.concurrent.*;

/**
 * @Description:测试回调
 */
public class TestCallBack {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));

        //2 测试thenAccept()
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName());
            return "iphone";
        },executorService).thenAccept(name -> {
            //这里的参数name,就是上面异步变成返回的"iphone"
            System.out.println(name+" "+Thread.currentThread().getName());
        });
        future.join();
    }
}

运行结果:

thenAcceptAsync异步 回调代码示例:

package imooc7.cf;

import java.util.concurrent.*;

/**
 * @Description:测试回调
 */
public class TestCallBack {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
        //2 测试thenAccept()
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName());
            return "iphone";
        },executorService).thenAcceptAsync(name -> {
            //这里的参数name,就是上面异步变成返回的"iphone"
            System.out.println(name+" "+Thread.currentThread().getName());
        },executorService);
        future.join();
    }
}

运行结果:

thenApply回调示例:

package imooc7.cf;

import java.util.concurrent.*;

/**
 * @Description:测试回调
 */
public class TestCallBack {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
        //3 thenApply
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName());
            return "iphone";
        }).thenApply(name -> {
            System.out.println(name + " " + Thread.currentThread().getName());
            return 1;
        });
        Integer join = future.join();
        System.out.println(join);
    }
}

运行结果:

thenApplyAsync回调示例:

package imooc7.cf;

import java.util.concurrent.*;

/**
 * @Description:测试回调
 */
public class TestCallBack {
    public static void main(String[] args) {
        //1 测试thenRun
        ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
        //3 thenApply
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName());
            return "iphone";
        }).thenApplyAsync(name -> {
            System.out.println(name + " " + Thread.currentThread().getName());
            return 1;
        }, executorService);
        Integer join = future.join();
        System.out.println(join);
    }
}

运行结果:

总结:

        我们可以根据是否需要参数,是否需要返回值,作为异步编程回调函数选择依据。

6.异步编程(四)异步任务编排

thenCompose示例:

package imooc7.cf;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

/**
 * @Description: 测试thenCompose
 * 获取用户订单
 * 1 获取用户信息
 * 2 获取用户相关订单
 */
public class TestThenCompose {
    public static void main(String[] args) {
        //获取用户信息
        CompletableFuture<User> userCompletableFuture = CompletableFuture.supplyAsync(() -> {
            return getUserInfo();
        });
        //获取用户订单
        CompletableFuture<Order> orderCompletableFuture = userCompletableFuture.thenCompose(user -> {
            return getUserOrder(user);
        });
        CompletableFuture<CompletableFuture<Order>> completableFutureCompletableFuture = userCompletableFuture.thenApply(user -> getUserOrder(user));

        System.out.println(orderCompletableFuture.join());
        //扁平化 flatMap,可以把嵌套的数据转换为单层的数据
//        List<klass> klasses = new ArrayList<>();
//        //List<User> list ??
//        List<List<User>> collect = klasses.stream().map(klass -> klass.userList).collect(Collectors.toList());
//        List<User> collect1 = klasses.stream().flatMap(klass -> klass.userList.stream()).collect(Collectors.toList());
    }

    private static CompletableFuture<Order> getUserOrder(User user) {
        return CompletableFuture.supplyAsync(()->{
            return new Order(10,user.getId(),"手机");
        });
    }

    private static User getUserInfo() {
        return new User(1,"小明");
    }

    static class Order{
        private Integer id;
        private Integer userId;
        private String product;

        public Order(Integer id, Integer userId, String product) {
            this.id = id;
            this.userId = userId;
            this.product = product;
        }

        @Override
        public String toString() {
            return "Order{" +
                    "id=" + id +
                    ", userId=" + userId +
                    ", product='" + product + '\'' +
                    '}';
        }
    }

    static class User{
        private Integer id;
        private String name;

        public Integer getId() {
            return id;
        }

        public void setId(Integer id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public User(Integer id, String name) {
            this.id = id;
            this.name = name;
        }
    }
    //班级类
    static class klass{
        List<User> userList;
    }
}

运行结果:

thenCombine示例:

 

package imooc7.cf;


import java.util.concurrent.CompletableFuture;

/**
 * @Description: 测试thenCombine
 * 生成订单
 * 1 查询用户信息
 * 2 查询商品信息
 * 组装订单
 */
public class TestThenCombine {
    public static void main(String[] args) {
        //1 查询用户信息
        CompletableFuture<User> userCompletableFuture = CompletableFuture.supplyAsync(() -> {
            return getUserInfo();
        });
        //2 查询商品信息
        CompletableFuture<Product> productCompletableFuture = CompletableFuture.supplyAsync(() -> {
            return getProductInfo();
        });
        //联合 1 和2 的结果
        //参数user和参数product就是前面两个CompletableFuture返回的结果
        CompletableFuture<Order> orderCompletableFuture = userCompletableFuture.thenCombine(productCompletableFuture, (user, product) -> {
            return new Order(user, product);
        });
        System.out.println(orderCompletableFuture.join());

    }

    private static Product getProductInfo() {
        return new Product(10,"手机");
    }

    private static User getUserInfo() {
        return new User(1,"小明");
    }
    static class User{
        private Integer id;
        private String name;

        public Integer getId() {
            return id;
        }

        public void setId(Integer id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public User(Integer id, String name) {
            this.id = id;
            this.name = name;
        }

        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }

    static class Product{
        private Integer id;
        private String productName;

        public Product(Integer id, String productName) {
            this.id = id;
            this.productName = productName;
        }

        @Override
        public String toString() {
            return "Product{" +
                    "id=" + id +
                    ", productName='" + productName + '\'' +
                    '}';
        }
    }

    static class Order{
        private User user;
        private Product product;

        public Order(User user, Product product) {
            this.user = user;
            this.product = product;
        }

        @Override
        public String toString() {
            return "Order{" +
                    "user=" + user +
                    ", product=" + product +
                    '}';
        }
    }
}

运行结果:

7.异步编程(五)竞争性任务处理

anyOf()示例:

package imooc7.cf;

import java.util.concurrent.CompletableFuture;

/**
 * @Description:测试anyof 任意一个结束就返回
 */
public class TestAnyOf {
    public static void main(String[] args) {
        //查询天气预报
        CompletableFuture<String> weatherForService = getWeatherForService("A",1000L);
        CompletableFuture<String> weatherForService1 = getWeatherForService("B",2000L);
        CompletableFuture<String> weatherForService2 = getWeatherForService("C",3000L);
        CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(weatherForService, weatherForService1, weatherForService2);
        System.out.println(objectCompletableFuture.join());
    }
    public static CompletableFuture<String> getWeatherForService(String serviceName, Long time){
        return CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return serviceName+" 20";
        });
    }
}

allOf示例:

package imooc7.cf;

import java.util.concurrent.CompletableFuture;

/**
 * @Description: 测试allOf方法
 * 比价
 * 都返回才可以比较
 */
public class TestAllOf {
    public static void main(String[] args) {
        CompletableFuture<Integer> completableFuture1 = getPriceFromPlatform("A", 1000L);
        CompletableFuture<Integer> completableFuture2 = getPriceFromPlatform("B", 2000L);
        CompletableFuture<Integer> completableFuture3 = getPriceFromPlatform("C", 3000L);
        CompletableFuture<Void> future = CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3);
        future.join();
//        System.out.println(completableFuture1.join());
//        System.out.println(completableFuture2.join());
//        System.out.println(completableFuture3.join());
        Integer priceA = completableFuture1.join();
        Integer priceB = completableFuture2.join();
        Integer priceC = completableFuture3.join();
        int min = Math.min(priceA, Math.min(priceB, priceC));
        System.out.println(min);
    }

    public static CompletableFuture<Integer> getPriceFromPlatform(String platform,Long time){
        return CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            if(platform.equals("A")){
                return 100;
            }else if(platform.equals("B")){
                return 200;
            }else{
                return 300;
            }
        });
    }
}

8.异步编程(六)异步任务异常处理

代码示例:

package imooc7.cf;

import java.util.concurrent.CompletableFuture;

/**
 * @Description:    
 */
public class TestException {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "async1";
        }).thenApply(content -> {
            int i = 1/0;
            return content + " async2";
        }).thenApply(content -> {
            return content + " async3";
        }).exceptionally(ex->{
            //异常处理
            System.out.println(ex.getMessage());
            return "exception";
        });
        System.out.println(future.join());
    }

}

总结:

        不管是在异步代码还是回调代码出现异常,都可以通过exceptionally捕捉到。

代码示例:

package imooc7.cf;

import java.util.concurrent.CompletableFuture;

/**
 * @Description:
 */
public class TestHandle {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {

            return "async1";
        }).handle((res,ex)->{
                    System.out.println("handle执行了");
                    if(ex!=null){
                        System.out.println(ex.getMessage());
                        return "exception1";
                    }
                    return res;
                }).thenApply(content -> {
            int i = 1/0;
            return content + " async2";
        }).handle((res,ex)->{
            if(ex!=null){
                System.out.println(ex.getMessage());
                return "exception2";
            }
            return res;
        }) .thenApply(content -> {
            return content + " async3";
        });
        System.out.println(future.join());
    }
}

总结:每一个异步任务,都可以有一个handle(),用于上一个异步任务处理数据和异常。

package imooc7.cf;

import java.util.concurrent.CompletableFuture;

/**
 * @Description:
 */
public class TestWhenComplete {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "async1";
        }).thenApply(content -> {
            int i = 1/0;
            return content + " async2";
        }).thenApply(content -> {
            return content + " async3";
        }).whenComplete((res,ex)->{
            if(ex!=null){
                System.out.println(ex.getMessage());
            }else{
                System.out.println(res);
            }

        });
//        System.out.println(future.join());
    }
}

总结:

        三种处理异常的方式,如何选择呢?

        1)如果想要通用的异常处理,可以使用exceptionally()或whenComplete();

        2)  如果每一步异步任务需要定制化处理,可以使用handle()。

9.CompletionService:如何批量执行异步任务?

需求:将商品链接转成二维码,返回打码链接并发送给客户,而且要支持批量打码!

package imooc7.cs;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

/**
 * @Description:图片打二维码
 */
public class TestAddCode {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
        //存储图片地址
        List<String> images = new ArrayList<>();
        images.add("image1");
        images.add("image2");
        images.add("image3");
        images.add("image4");
        images.add("image5");
        images.add("image6");
        List<Future<String>> futures = new ArrayList<>();
        for (String image : images) {
            Future<String> future = executorService.submit(() -> {
                try {
                    Thread.sleep(new Random().nextLong(5) * 1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return image+ " Test";
            });
            futures.add(future);
        }
        for (Future<String> future : futures) {
            String res = future.get();
            sendToCustomer(res);
        }
    }

    private static void sendToCustomer(String res) {
        System.out.println("send image to customer "+res);
    }
}

运行结果:

 

 

package imooc7.cs;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

/**
 * @Description:
 */
public class TestAddCode1 {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(100);
        //存储图片地址
        List<String> images = new ArrayList<>();
        images.add("image1");
        images.add("image2");
        images.add("image3");
        images.add("image4");
        images.add("image5");
        images.add("image6");
        List<Future<String>> futures = new ArrayList<>();
        for (String image : images) {
            Future<String> future = executorService.submit(() -> {
                try {
                    Thread.sleep(new Random().nextLong(5) * 1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return image+ " Test";
            });
            futures.add(future);
        }
        for (Future<String> future : futures) {
            executorService.submit(()->{
                try {
                    blockingQueue.put(future.get());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        for (int i = 0; i < images.size(); i++) {
            String take = blockingQueue.take();
            sendToCustomer(take);
        }

    }
    private static void sendToCustomer(String res) {
        System.out.println("send image to customer "+res);
    }
}

运行结果:

CompletionService示例:

package imooc7.cs;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

/**
 * @Description:使用completionService实现打二维码业务
 */
public class TestAddCode2 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
        CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
        //存储图片地址
        List<String> images = new ArrayList<>();
        images.add("image1");
        images.add("image2");
        images.add("image3");
        images.add("image4");
        images.add("image5");
        images.add("image6");
        for (String image : images) {
            completionService.submit(()->{
                Thread.sleep(new Random().nextLong(5)*1000);
                return image+" Test";
            });
        }
        for (int i = 0; i < images.size(); i++) {
            String res = completionService.take().get();
            sendToCustomer(res);
        }

    }
    private static void sendToCustomer(String res) {
        System.out.println("send image to customer "+res);
    }
}

运行结果:

这样写代码简洁了很多。


http://www.kler.cn/a/554160.html

相关文章:

  • 基于SpringBoot的“食物营养分析与推荐网站”的设计与实现(源码+数据库+文档+PPT)
  • vxe-grid 通过配置式给单元格字段格式化树结构数据,转换树结构节点
  • Jenkins插件管理切换国内源地址
  • 前端开发岗模拟面试题套卷A答案及解析(一)技术面部分
  • LeetCode--236. 二叉树的最近公共祖先
  • NPM环境搭建指南
  • [笔记.AI]如何判断模型是否通过剪枝、量化、蒸馏生成?
  • 透明DNS策略
  • 【ISO 14229-1:2023 UDS诊断(ECU复位0x11服务)测试用例CAPL代码全解析⑲】
  • vue 解决image-conversion图片处理插件压缩后图片底色变黑问题
  • 23种设计模式 - 访问者模式
  • < OS 有关 > Ubuntu 24 SSH 服务器更换端口 in jp/us VPSs
  • 【JavaEE进阶】Spring Boot日志
  • 爬虫抓取数据后如何存储?
  • 以下是MySQL中常见的增删改查语句
  • verilog程序设计及SystemVerilog验证
  • Linux 多进程生产者消费者模型实现
  • OkHttp使用和源码分析学习(一)
  • leetcode day18 移除元素 26+283
  • 438. 找到字符串中所有字母异位词(LeetCode 热题 100)