reactivex.Observable 超时问题
下面代码测试可知:超时设置需要在map之后才有效,换句话说就是,超时只对超时设置之前的代码有用
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class TimeoutTest {
public static void main(String[] args) throws InterruptedException {
TimeoutTest test = new TimeoutTest();
System.out.println("=== 测试超时任务 ===");
test.testTimeout();
System.out.println("\n=== 测试正常任务 ===");
test.testNormal();
System.out.println("\n=== 测试长任务 ===");
test.testLongTask();
// 确保主线程不退出
Thread.sleep(5000);
}
// 测试超时任务
public void testTimeout() {
Observable.create(emitter -> {
System.out.println("超时任务模拟:执行开始...");
// Thread.sleep(2000); // 模拟超长时间任务(超过1秒)
emitter.onNext("任务完成");
emitter.onComplete();
})
.map(s->{
Thread.sleep(4000); // 模拟超长时间任务(超过1秒)
System.out.println("处理s0!");
return s + "x";
})
.timeout(3, TimeUnit.SECONDS) // 设置超时时间为1秒
.onErrorResumeNext(throwable -> {
if (throwable instanceof java.util.concurrent.TimeoutException) {
System.out.println("任务处理超时:跳过当前任务!");
} else {
System.err.println("任务发生其他异常:" + throwable.getMessage());
}
return Observable.empty(); // 返回空的Observable,继续处理其他任务
})
.subscribe(
result -> System.out.println("结果: " + result),
throwable -> System.err.println("订阅时异常: " + throwable.getMessage()),
() -> System.out.println("任务已完成")
);
}
// 测试正常任务
public void testNormal() {
Observable.create(emitter -> {
System.out.println("正常任务模拟:执行开始...");
Thread.sleep(500); // 模拟快速任务(小于1秒)
emitter.onNext("任务完成");
emitter.onComplete();
})
.timeout(1, TimeUnit.SECONDS) // 设置超时时间为1秒
.onErrorResumeNext(throwable -> {
System.err.println("任务超时或其他异常:" + throwable.getMessage());
return Observable.empty();
})
.subscribe(
result -> System.out.println("结果: " + result),
throwable -> System.err.println("订阅时异常: " + throwable.getMessage()),
() -> System.out.println("任务已完成")
);
}
// 测试长时间任务
public void testLongTask() {
Observable.create(emitter -> {
System.out.println("长时间任务模拟:执行开始...");
for (int i = 0; i < 5; i++) {
System.out.println("任务进行中: Step " + (i + 1));
Thread.sleep(600); // 模拟分段任务,每次处理600ms
emitter.onNext("Step " + (i + 1));
}
emitter.onComplete();
})
.timeout(1, TimeUnit.SECONDS) // 设置超时时间为1秒
.onErrorResumeNext(throwable -> {
if (throwable instanceof java.util.concurrent.TimeoutException) {
System.out.println("任务处理超时:跳过当前任务!");
} else {
System.err.println("任务发生其他异常:" + throwable.getMessage());
}
return Observable.empty(); // 返回空的Observable,继续处理其他任务
})
.subscribe(
result -> System.out.println("结果: " + result),
throwable -> System.err.println("订阅时异常: " + throwable.getMessage()),
() -> System.out.println("任务已完成")
);
}
}