当前位置: 首页 > article >正文

交替打印数字/拓展多线程/多种实现方式

思路

整体的思路是使用一个原子计数器,用于每个线程计算是否轮到自己输出数字。
分别使用了

  • Object.wait() Object.notifyAll()
  • Lock.condition.await() Lock.condition.signalAll()
  • Semaphore.acquire() Semaphore.release()
  • LockSupport.park() LocakSupport.unPark(Thread thread)

Object.wait()

使用了 CompletableFuture.runAsync(Runnable runnanle) 来执行线程,
有关CompletableFuture 使用请查看这篇文章 CompletableFuture 看这篇操作就够了
使用了 CompletableFuture.allOf(CompletableFuture<?>[]) 来等待所有线程执行完毕。
使用了 IntStream.range(int startInclusive, int endExclusive) 创建了一个从起始位值到终点位置的顺序序列流
使用了 synchronized 阻塞其他线程,使多线程只有其中一个可以获取到锁
使用了一个while 循环来判断是否进入了当前线程打印数字逻辑,如果进入了则使线程等待,执行打印数字逻辑,然后唤醒所有等待的线程

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

public class ObjectPrint {

    private final AtomicLong total = new AtomicLong(0);
    private final int threadCount = 5;
    private final int printCount = 12;

    private final Object obj = new Object();

    @Test
    public void print() {
        CompletableFuture.allOf(init()).join();
    }

    private CompletableFuture<?>[] init() {
        CompletableFuture<?>[] futures = new CompletableFuture<?>[threadCount];
        for (int i = 0; i < threadCount; i++) {
            futures[i] = CompletableFuture.runAsync(print(i));
        }
        return futures;
    }

    public Runnable print(int threadNum) {
        return () -> {
            IntStream.range(0, printCount)
                    .forEach(k -> {
                        while (total.get() != threadNum) {
                            synchronized (obj) {
                                try {
                                    obj.wait();
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            }
                        }
                        System.out.println("Thread " + (threadNum + 1) + ": " + (threadNum + 1));
                        total.set((total.get() + 1) % threadCount);
                        synchronized (obj) {
                            obj.notifyAll();
                        }
                    });
        };

    }

}

Lock.condition.await()

public class ConditionPrint {

    private final AtomicLong total = new AtomicLong(0);
    private final int threadCount = 5;
    private final int printCount = 12;

    private final Lock lock = new ReentrantLock(true);
    private final Condition condition = lock.newCondition();

    @Test
    public void print() {
        CompletableFuture.allOf(init()).join();
    }

    private CompletableFuture<?>[] init() {
        CompletableFuture<?>[] futures = new CompletableFuture<?>[threadCount];
        for (int i = 0; i < threadCount; i++) {
            futures[i] = CompletableFuture.runAsync(print(i));
        }
        return futures;
    }

    public Runnable print(int threadNum) {
        return () -> {
            IntStream.range(0, printCount)
                    .forEach(k -> {
                        lock.lock();
                        try {
                            while (total.get() != threadNum) {
                                condition.await();
                            }
                            System.out.println("Thread " + (threadNum + 1) + ": " + (threadNum + 1));
                            total.set((total.get() + 1) % threadCount);
                            condition.signalAll();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        } finally {
                            lock.unlock();
                        }
                    });
        };

    }

}

Semaphore.acquire()

public class SemaphoreTest {

    private final AtomicLong total = new AtomicLong(0);
    private final int threadCount = 5;
    private final int printCount = 5;

    private final Semaphore semaphore = new Semaphore(1, true);

    @Test
    public void print() {
        CompletableFuture.allOf(init()).join();
    }

    private CompletableFuture<?>[] init() {
        CompletableFuture<?>[] futures = new CompletableFuture<?>[threadCount];
        for (int i = 0; i < threadCount; i++) {
            futures[i] = CompletableFuture.runAsync(print(i));
        }
        return futures;
    }

    public Runnable print(int threadNum) {
        return () -> {
            IntStream.range(0, printCount)
                    .forEach(k -> {
                        while (true) {
                            try {
                                semaphore.acquire();
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }

                            // 检查是否轮到当前线程打印
                            if (total.get() == threadNum) {
                                System.out.println("Thread " + (threadNum + 1) + ": " + (threadNum + 1));
                                // 更新为下一个线程编号
                                total.set((total.get() + 1) % threadCount);
                                semaphore.release();
                                break; // 退出循环
                            }
                            // 如果不是该线程打印,则释放信号量并继续循环
                            semaphore.release();
                        }
                    });
        };

    }

}

LockSupport.park()

public class LockSupportPrint {

    private final AtomicLong total = new AtomicLong(0);
    private final int threadCount = 5;
    private final int printCount = 5;

    private final Thread[] threads = new Thread[threadCount];

    @Test
    public void print() {
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(print(i));
            threads[i].start();
        }

        // 唤醒第一个线程开始打印
        LockSupport.unpark(threads[0]);

        // 等待所有线程完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

    }

    public Runnable print(int threadNum) {
        return () -> {
            IntStream.range(0, printCount)
                    .forEach(k -> {
                        // 进入循环,直到获取到信号量
                        while (true) {
                            // 检查是否轮到当前线程打印
                            if (total.get() == threadNum) {
                                System.out.println("Thread " + (threadNum + 1) + ": " + (threadNum + 1));
                                // 更新为下一个线程编号
                                total.set((total.get() + 1) % threadCount); 
                                LockSupport.unpark(threads[(threadNum + 1) % threadCount]);
                                break;
                            }
                            LockSupport.park();
                        }
                    });
        };

    }
    
}

http://www.kler.cn/news/363018.html

相关文章:

  • Apache Seata 新版本集成了 RocketMQ 事务消息
  • Git 文件大小写混乱?
  • 对接金蝶云星空存货档案到MES系统的详细步骤及javajs动态脚本拉取的实现
  • 10. 异常处理器
  • Redis提供了专门的命令来实现自增操作
  • U盘文件或目录损坏且无法读取:原因、恢复与预防全攻略
  • 德高望重的c语言
  • 量子纠错--shor‘s 码
  • Fast-LLM:加速大型语言模型训练的开源库
  • 抖店自动售后系统
  • # Tomcat NIO 配置实操指南
  • pytorch nn.NLLLoss和nn.CrossEntropyLoss函数区别
  • vue,java,webSocket通讯,服务端主动给多客户端发消息
  • 精准布局:探索CSS中的盒子固定定位的魅力
  • [计算机网络]第一周
  • ES6运算符
  • 汽车票预订系统:基于SpringBoot的创新
  • 046_python基于Hadoop的智慧校园数据共享平台
  • css预编译器-Sass常用命令详解
  • 数据库实战:MySQL、SQL语句总结与应用案例分享
  • PDF编辑功能是灰色的,什么原因?
  • Lua for循环语句
  • edge浏览器:你的连接不是专用连接
  • RSA_dp泄露
  • 因特网的概述
  • Ubuntu22.04 加入AD域