第15章 监控任务的生命周期(Java高并发编程详解:多线程与系统设计)
1.场景描述
虽然Thread为我们提供了可获取状态, 以及判断是否alive的方法, 但是这些方法均是针对线程本身的, 而我们提交的任务Runnable在运行过程中所处的状态如何是无法直接获得的, 比如它什么时候开始, 什么时候结束, 最不好的一种体验是无法获得Runnable任务执行后的结果。一般情况下想要获得最终结果, 我们不得不为Thread或者Runnable传入共享变量,但是在多线程的情况下,共享变量将导致资源的竞争从而增加了数据不一致性的安全隐患。
2.当观察者模式遇到Thread
当某个对象发生状态改变需要通知第三方的时候,观察者模式就特别适合胜任这样的工作。观察者模式需要有事件源, 也就是引发状态改变的源头, 很明显Thread负责执行任务的逻辑单元,它最清楚整个过程的始末周期,而事件的接收者则是通知接受者一方,严格意义上的观察者模式是需要Observer的集合的, 我们在这里不需要完全遵守这样的规则,只需将执行任务的每一个阶段都通知给观察者即可。
2.1接口定义
1.Observable接口定义
Observable接口定义的代码如所示
public interface Observable {
// 任务生命周期的枚举类型
enum Cycle{
STARTED, RUNNING, DONE, ERROR
}
// 获取当前任务的生命周期状态
Cycle getCycle();
// 定义启动线程的方法,主要作用是为了屏蔽Thread的其他方法
void start();
// 定义线程的打断方法,作用与start方法一样,也是为了屏蔽Thread的其他方法
void interrupt();
}
该接口主要是暴露给调用者使用的,其中四个枚举类型分别代表了当前任务执行生命周期的各个阶段,具体如下
- getCycle() 方法用于获取当前任务处于哪个执行阶段。
- start() 方法的目的主要是为了屏蔽Thread类其他的API, 可通过Observable的start对线程进行启动。
- interrupt(方法的作用与start一样, 可通过Observable的interrupt对当前线程进行中断。
2.TaskLifecycle接口定义
public interface TaskLifecycle<T> {
// 任务启动时会触发onStart方法
void onStart(Thread thread);
// 任务正在运行时会触发onRunning方法
void onRunning(Thread thread);
// 任务运行结束时会触发onFinish方法,其中result是任务执行结束后的结果
void onFinish(Thread thread, T result);
// 任务执行报错时会触发onError方法
void onError(Thread thread, Exception e);
//生命周期接口的空实现(Adapter)
class EmptyLifecycle<T> implements TaskLifecycle<T> {
@Override
public void onStart(Thread thread) {
}
@Override
public void onRunning(Thread thread) {
}
@Override
public void onFinish(Thread thread, T result) {
}
@Override
public void onError(Thread thread, Exception e) {
}
}
}
3.Task函数接口定义
@FunctionalInterface
public interface Task<T> {
// 任务执行接口,该接口允许有返回值
T call();
}
2.2 ObservableThread实现
public class ObservableThread<T> extends Thread implements Observable {
private final TaskLifecycle<T> lifecycle;
private final Task<T> task;
private Cycle cycle;
// 指定Task的实现,默认情况下使用EmptyLifecycle
public ObservableThread(Task<T> task) throws IllegalAccessException {
this(new TaskLifecycle.EmptyLifecycle<>(), task);
}
// 指定TaskLifecycle的同时指定Task
public ObservableThread(TaskLifecycle<T> lifecycle, Task<T> task) throws IllegalAccessException {
super();
// Task不允许为null
if ( task == null )
throw new IllegalAccessException("The task is required.");
this.lifecycle = lifecycle;
this.task = task;
}
@Override
public void run() {
// 在执行线程逻辑单元的时候,分别触发相应的事件
this.update(Cycle.STARTED, null,null);
try {
this.update(Cycle.RUNNING, null, null);
T result = this.task.call();
this.update(Cycle.DONE, result, null);
}catch (Exception e) {
this.update(Cycle.ERROR, null, e);
}
}
private void update(Cycle cycle, T result, Exception e) {
this.cycle = cycle;
if ( lifecycle == null )
return;
try {
switch (cycle) {
case STARTED:
this.lifecycle.onError(currentThread());
break;
case RUNNING:
this.lifecycle.onRunning(currentThread());
break;
case DONE:
this.lifecycle.onFinish(currentThread(), result);
break;
case ERROR:
this.lifecycle.onError(currentThread(), e);
break;
}
}catch (Exception ex) {
if ( cycle == Cycle.ERROR ) {
throw ex;
}
}
}
@Override
public Cycle getCycle() {
return this.cycle;
}
}
重写父类的run方法, 并且将其修饰为final类型, 不允许子类再次对其进行重写, run方法在线程的运行期间,可监控任务在执行过程中的各个生命周期阶段,任务每经过一个阶段相当于发生了一次事件。
update方法用于通知时间的监听者, 此时任务在执行过程中发生了什么, 最主要的通知是异常的处理。如果监听者也就是Task Lifecycle, 在响应某个事件的过程中出现了意外,则会导致任务的正常执行受到影响,因此需要进行异常捕获,并忽略这些异常信息以保证Task Lifecycle的实现不影响任务的正确执行, 但是如果任务执行过程中出现错误并且抛出了异常, 那么update方法就不能忽略该异常, 需要继续抛出异常, 保持与call方法同样的意图。
2.3 测试用例代码实现
测试代码01:
public class TaskClient {
public static void main(String[] args) throws IllegalAccessException {
Observable observable = new ObservableThread<>(
() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finished done.");
return null;
}
);
observable.start();
}
}
测试代码02:
import java.util.concurrent.TimeUnit;
public class TaskClient1 {
public static void main(String[] args) throws IllegalAccessException {
final TaskLifecycle<String> lifecycle = new TaskLifecycle.EmptyLifecycle<String>(){
@Override
public void onFinish(Thread thread, String result) {
System.out.println("The result is " + result);
}
};
Observable observable = new ObservableThread<>(lifecycle, () ->
{
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" finished done.");
return "Hello Observer";
});
}
}