AI大模型编写多线程并发框架(六十三):监听器优化·下
系列文章目录
文章目录
- 系列文章目录
- 前言
- 一、项目背景
- 二、第十一轮对话-修正运行时数据
- 三、修正任务计数器
- 四、第十二轮对话-生成单元测试
- 五、验证通过
- 七、参考文章
前言
在这个充满技术创新的时代,AI大模型正成为开发者们的新宠。它们可以帮助我们完成从简单的问答到复杂的编程任务,所以AI编程将会是未来的主流方向,利用AI大模型的能力,本文将介绍从零到一用AI大模型编写一个多线程并发框架。
一、项目背景
经过上两篇文章和AI的对话,我们基本捣鼓出来了多线程并发框架的雏形,并且接入了监听器,但还是存在一些问题,限于篇幅,本文继续优化监听器。
本多线程框架使用示例如下:源码地址
1、引入依赖。
<dependency>
<groupId>io.github.vipjoey</groupId>
<artifactId>mmc-juc</artifactId>
<version>1.0</version>
</dependency>
2、使用示例。
// 创建一个MmcTaskExecutor实例,用于执行单次长耗时任务
// 下面是创建一个计算从1加到100的任务,总共100个任务,采用fork分治算法,阈值为10,总共任务为100 / 10 * 2 = 20个大任务,执行速率约为10/s
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
.taskSource(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList())) // 设置任务源
.taskProcessor(x -> x.stream().reduce(0, Integer::sum)) // 设置任务处理方法
.taskMerger(Integer::sum) // 设置结果处理方法(可选)
.threshold(10) // 设置任务处理阈值(可选)
.taskName("mmcTaskExample") // 设置任务名称
.rateLimiter(10, 20) // 设置速率限制,容量为10,每秒产生令牌为20,休眠时间为10ms
.forkJoinPoolConcurrency(4) // 设置ForkJoinPool的并发度为4
.build();
// 同步执行并打印结果
System.out.println("result: " + mmcTaskExecutor.execute());
// 任务执行过程监控
[mmcTaskExample] Tasks submitted. Total tasks: 100
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 20, remaining tasks: 80
[mmcTaskExample] Task started. Completed tasks: 26, remaining tasks: 74
[mmcTaskExample] Task started. Completed tasks: 33, remaining tasks: 67
[mmcTaskExample] Task started. Completed tasks: 40, remaining tasks: 60
[mmcTaskExample] Task started. Completed tasks: 46, remaining tasks: 54
[mmcTaskExample] Task started. Completed tasks: 52, remaining tasks: 48
[mmcTaskExample] Task started. Completed tasks: 58, remaining tasks: 42
[mmcTaskExample] Task started. Completed tasks: 64, remaining tasks: 36
[mmcTaskExample] Task started. Completed tasks: 70, remaining tasks: 30
[mmcTaskExample] Task started. Completed tasks: 76, remaining tasks: 24
[mmcTaskExample] Task started. Completed tasks: 82, remaining tasks: 18
[mmcTaskExample] Task started. Completed tasks: 88, remaining tasks: 12
[mmcTaskExample] Task started. Completed tasks: 94, remaining tasks: 6
[mmcTaskExample] Task started. Completed tasks: 100, remaining tasks: 0
[mmcTaskExample] Tasks completed. Elapsed time: 1107 ms
result: 5050
二、第十一轮对话-修正运行时数据
1、上文提到,它在MmcTask来初始化运行时数据,这明显是不对的。TaskRuntime 应该从MmcTaskExecutor中首次提交任务时初始化并透传。
2、以下是AI的回复。
- 【抱歉,我理解了您的问题。我们可以将TaskRuntime的初始化从MmcTask构造函数中移除,并将其传递给MmcTask。这样,在任务fork时,TaskRuntime将保持不变。首先,我们需要修改MmcTask类以接收TaskRuntime作为参数。】
import java.util.List;
import java.util.concurrent.RecursiveTask;
public class MmcTask<T, R> extends RecursiveTask<R> {
// ...省略其他成员变量和方法
private TaskRuntime taskRuntime;
public MmcTask(Builder<T, R> builder) {
// ...省略其他初始化代码
this.taskRuntime = builder.taskRuntime;
}
@Override
protected R compute() {
if (end - start <= threshold) {
// 在处理任务之前调用onTaskStarted方法
taskListener.onTaskStarted(taskRuntime);
// ...省略其他代码
// 在任务完成后更新已完成任务的计数
taskRuntime.getCompletedTasks().incrementAndGet();
taskRuntime.getRemainingTasks().decrementAndGet();
} else {
// ...省略其他代码
MmcTask<T, R> leftTask = new Builder<T, R>()
.taskSource(taskSource)
.taskProcessor(taskProcessor)
.taskMerger(taskMerger)
.threshold(threshold)
.start(start)
.end(mid)
.rateLimiter(rateLimiter)
.taskListener(taskListener)
.taskRuntime(taskRuntime)
.build();
MmcTask<T, R> rightTask = new Builder<T, R>()
.taskSource(taskSource)
.taskProcessor(taskProcessor)
.taskMerger(taskMerger)
.threshold(threshold)
.start(mid)
.end(end)
.rateLimiter(rateLimiter)
.taskListener(taskListener)
.taskRuntime(taskRuntime)
.build();
// ...省略其他代码
}
}
public static class Builder<T, R> {
// ...省略其他成员变量
private TaskRuntime taskRuntime;
// ...省略其他方法
public Builder<T, R> taskRuntime(TaskRuntime taskRuntime) {
this.taskRuntime = taskRuntime;
return this;
}
public MmcTask<T, R> build() {
return new MmcTask<>(this);
}
}
}
- 【接下来,我们需要修改MmcTaskExecutor类以在创建MmcTask时传递TaskRuntime。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;
public class MmcTaskExecutor<T, R> {
// ...省略其他成员变量和方法
// 提交新任务
public R submitTask(MmcTask<T, R> mmcTask) {
// 调用onTasksSubmitted方法
taskListener.onTasksSubmitted(mmcTask.getTaskRuntime(), System.currentTimeMillis());
R result = forkJoinPool.invoke(mmcTask);
// 调用onTasksCompleted方法
taskListener.onTasksCompleted(mmcTask.getTaskRuntime(), mmcTask.getElapsedTime(), System.currentTimeMillis());
return result;
}
// 同步执行并返回结果
public R execute() {
TaskRuntime taskRuntime = new TaskRuntime(taskName, taskSource.size());
MmcTask<T, R> mmcTask = new MmcTask.Builder<T, R>()
// ...省略其他方法调用
.taskRuntime(taskRuntime)
.build();
return submitTask(mmcTask);
}
}
三、修正任务计数器
AI这次修改基本上符合我的诉求,现在就剩下计数器问题了。上文我提到过,AI理解的累计完成任务个数和我理解的任务个数不一样,可能跟我的提示词表达方式有关(有根更好表达方式的同学可以评论区回复留言),这里我就不纠结了,直接给他修正一下。
1、监听器的默认实现方式。
package com.mmc.juc;
public class DefaultMmcTaskListener implements MmcTaskListener {
@Override
public void onTasksSubmitted(TaskRuntime taskRuntime) {
System.out.println(
"[" + taskRuntime.getTaskName() + "] Tasks submitted. Total tasks: " + taskRuntime.getTotalTasks());
}
@Override
public void onTaskStarted(TaskRuntime taskRuntime) {
System.out.println(
"[" + taskRuntime.getTaskName() + "] Task started. Completed tasks: " + taskRuntime.getCompletedTasks()
.get() + ", remaining tasks: " + taskRuntime.getRemainingTasks().get());
}
@Override
public void onTasksCompleted(TaskRuntime taskRuntime, long elapsedTime) {
System.out.println(
"[" + taskRuntime.getTaskName() + "] Tasks completed. Elapsed time: " + elapsedTime + " ms ");
}
}
2、修正compute方法,修正计数器计数方式。
package com.mmc.juc;
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Data;
import lombok.Getter;
import lombok.experimental.Accessors;
@Getter
public class MmcTask<T, R> extends RecursiveTask<R> {
// 省略其他代码
@Override
protected R compute() {
// 在处理任务之前尝试消耗令牌
while (!rateLimiter.tryConsume()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (end - start <= threshold) {
R result = taskProcessor.process(taskSource.subList(start, end));
// 在任务完成后更新已完成任务的计数
taskRuntime.getCompletedTasks().addAndGet(end - start);
taskRuntime.getRemainingTasks().set(taskRuntime.getTotalTasks() - taskRuntime.getCompletedTasks().get());
// 调用onTaskStarted方法,以便在每个小任务完成时更新已完成任务的计数
taskListener.onTaskStarted(taskRuntime);
return result;
}
int middle = (start + end) / 2;
MmcTask<T, R> leftTask = new MmcTask.Builder<T, R>()
.taskSource(taskSource)
.taskProcessor(taskProcessor)
.taskMerger(taskMerger)
.threshold(threshold)
.start(start)
.end(middle)
.rateLimiter(rateLimiter)
.taskListener(taskListener)
.taskName(taskName)
.taskRuntime(taskRuntime)
.build();
MmcTask<T, R> rightTask = new MmcTask.Builder<T, R>()
.taskSource(taskSource)
.taskProcessor(taskProcessor)
.taskMerger(taskMerger)
.threshold(threshold)
.start(middle)
.end(end)
.rateLimiter(rateLimiter)
.taskListener(taskListener)
.taskName(taskName)
.taskRuntime(taskRuntime)
.build();
leftTask.fork();
R rightResult = rightTask.compute();
R leftResult = leftTask.join();
if (taskMerger != null) {
return taskMerger.merge(leftResult, rightResult);
} else {
return null;
}
}
}
四、第十二轮对话-生成单元测试
1、我们要求AI生成单元测试,验证整个框架计算逻辑是否准确。
2、以下是AI的回复。
- 【为了验证整个框架的计算逻辑、监听器、限流和执行方式的准确性,我们可以创建一个单元测试。在这个测试中,我们将使用1加到100的任务作为示例。以下是一个单元测试的示例:】
package com.mmc.juc;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Test;
public class MmcTaskExecutorTest {
@Test
public void testMmcTaskExecutor() {
// 创建一个任务源,包含从1到100的整数
List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
// 创建一个任务处理器,将每个整数相加
MmcTaskProcessor<Integer, Integer> taskProcessor = (integer) -> integer.stream().mapToInt(Integer::intValue).sum();
// 创建一个任务合并器,将所有整数的和合并
MmcTaskMerger<Integer> taskMerger = Integer::sum;
// 创建一个任务监听器,打印任务执行过程中的相关信息
MmcTaskListener taskListener = new DefaultMmcTaskListener();
// 创建一个MmcTaskExecutor实例
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = new MmcTaskExecutor.Builder<Integer, Integer>()
.taskSource(taskSource)
.taskProcessor(taskProcessor)
.taskMerger(taskMerger)
.taskListener(taskListener)
.rateLimiter(10, 10) // 每秒处理10个任务
.taskName("SumTask")
.build();
// 执行任务并获取结果
Integer result = mmcTaskExecutor.execute();
// 验证结果是否正确(1到100的和等于5050)
assertEquals(5050, result.intValue());
}
}
五、验证通过
1、我们运行一下单元测试,看看效果。
[SumTask] Tasks submitted. Total tasks: 100
[SumTask] Task started. Completed tasks: 7, remaining tasks: 87
[SumTask] Task started. Completed tasks: 13, remaining tasks: 87
[SumTask] Task started. Completed tasks: 19, remaining tasks: 81
[SumTask] Task started. Completed tasks: 26, remaining tasks: 74
[SumTask] Task started. Completed tasks: 32, remaining tasks: 68
[SumTask] Task started. Completed tasks: 38, remaining tasks: 62
[SumTask] Task started. Completed tasks: 44, remaining tasks: 56
[SumTask] Task started. Completed tasks: 50, remaining tasks: 50
[SumTask] Task started. Completed tasks: 56, remaining tasks: 44
[SumTask] Task started. Completed tasks: 62, remaining tasks: 38
[SumTask] Task started. Completed tasks: 68, remaining tasks: 32
[SumTask] Task started. Completed tasks: 74, remaining tasks: 26
[SumTask] Task started. Completed tasks: 80, remaining tasks: 20
[SumTask] Task started. Completed tasks: 87, remaining tasks: 13
[SumTask] Task started. Completed tasks: 93, remaining tasks: 7
[SumTask] Task started. Completed tasks: 100, remaining tasks: 0
[SumTask] Tasks completed. Elapsed time: 2161 ms
很完美!整个框架运行正常!下一篇,我们将发布这个小巧玲珑的框架,让全世界都看到他,作为第一个版本的里程碑!
七、参考文章
- 《AI大模型编写多线程并发框架(六十一):从零开始搭建框架》
- 《AI大模型编写多线程并发框架(六十二):限流和并发度优化》
- 《AI大模型编写多线程并发框架(六十三):监听器优化·上》
- 《AI大模型编写多线程并发框架(六十四):监听器优化·下》
- 《AI大模型编写多线程并发框架(六十五):发布和应用》
加我加群(备注csdn)一起交流学习!更多干货下载、项目源码和大厂内推等着你