Flutter 异步编程简述
1、isolate 机制
1.1 基本使用
Dart 是基于单线程模型的语言。但是在开发当中我们经常会进行耗时操作比如网络请求,这种耗时操作会堵塞我们的代码。因此 Dart 也有并发机制 —— isolate。APP 的启动入口main
函数就是一个类似 Android 主线程的一个主 isolate。与 Java 的 Thread 不同的是,Dart 中的 isolate 无法共享内存,因此也有称 isolate 是像进程一样的线程。
在 Dart 中,Isolate(隔离区)是一种独立运行的执行单元,它是 Dart 并发模型的基本组成部分。每个 Isolate 都有自己的内存堆,独立于其他 Isolate,并且彼此之间不共享内存。
Isolate 可以并行执行代码,使多个任务可以同时运行而互不干扰。每个 Isolate 都有自己的事件循环(event loop),可以独立地处理事件和执行任务。
通过使用 Isolate,您可以将应用程序的工作负载分发到多个 Isolate 中,从而实现并发处理和利用多核处理器的能力。
每个 isolate 都有一个 ReceivePort,ReceivePort 内又有一个 SendPort,可以将 SendPort 发送给对端的 isolate,这样就能实现两个 isolate 彼此通信了:
示例代码:
import 'dart:io';
import 'dart:isolate';
int i = 0;
void main() {
i = 10;
// 主 isolate 的 ReceivePort
var receivePort = ReceivePort();
// 创建子 isolate,传入主 isolate 的 SendPort
Isolate.spawn(isolateSub, receivePort.sendPort);
// 接收其他 isolate 发来的消息
receivePort.listen((message) {
// 如果对端发过来 sendPort,则主 isolate 也可以向对端的 isolate 发送消息
if (message is SendPort) {
message.send("主 isolate 接收到 SendPort");
} else {
print("接到子 isolate 消息:$message\n");
}
});
// 休眠 5s 测试
sleep(Duration(seconds: 5));
}
/// 新 isolate 的入口函数
void isolateSub(SendPort sendPort) {
// isolate 是内存隔离的,i 的值是在子 isolate 中没有修改,因此为 0
print(i);
// 创建子 isolate 的 SendPort 并发给主 isolate
var receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
// 向主 isolate 发送消息
sendPort.send("子 isolate 发送的消息~");
// 监听其他 isolate 发来的消息
receivePort.listen((message) {
print("接到主isolate消息:$message\n");
});
}
这段代码要注意两个问题:
- ReceivePort 在使用完毕后要通过 close() 关闭掉
- 在 main 中给 receivePort 设置完监听之后 sleep 了 5 秒,目的是测试消息的收发情况,结果是先输出了 isolateSub() 打印的 i 的 0 值,然后隔了 5 秒,主 isolate 和子 isolate 才陆续输出接收到的消息。说明 Dart 真的是单线程
可以将 isolate 看成 Java 线程,只不过线程空间不共享。
1.2 Event-Loop
上面提到,如果在 main() 中休眠 5 秒,那么主与子 isolate 接收消息也会延后 5 秒。这是因为 Dart 与 Android 一样都是事件驱动的,通过 Event-Loop 不停的从队列中获取消息或者事件来驱动整个应用的运行。isolate 发过来的消息就是通过 Event-Loop 处理。但是与 Android 不同的是,Android 中每个线程只有一个 Looper 所对应的 MessageQueue,而 Dart 中有两个队列,一个叫做 Event queue(事件队列),另一个叫做 Microtask queue(微任务队列):
Dart 在执行完 main 函数后,就会由 Loop 开始执行两个任务队列中的 Event:
- 首先 Loop 检查微服务队列,依次执行 Event
- 当微服务队列执行完后,就检查 Event queue 队列依次执行,在执行 Event queue 的过程中,每执行完一个 Event 就再检查一次微服务队列。所以微服务队列优先级高,可以利用微服务进行插队
我们来看几个例子:
import 'dart:io';
void main() {
new File("/Users/enjoy/a.txt").readAsString().then((content) {
print(content);
});
while (true) {}
}
文件内容永远也无法打印出来,因为 main 函数还没执行完。而 then 方法是由 Loop 检查 Event queue 执行的。
void main() {
var receivePort = ReceivePort();
receivePort.listen((message) {
print(message);
});
receivePort.sendPort.send("发送消息1");
Future.microtask(() => print("执行微任务1"));
receivePort.sendPort.send("发送消息2");
Future.microtask(() => print("执行微任务2"));
receivePort.sendPort.send("发送消息3");
Future.microtask(() => print("执行微任务3"));
}
输出的结果是:
执行微任务1
执行微任务2
执行微任务3
发送消息1
发送消息2
发送消息3
这是因为微服务队列优先级高,Loop 在 main() 执行完开始处理消息时,先去微服务队列,看到队列中有三个任务就都执行了,然后才去 Event queue 中执行任务,每执行完一个任务还要再去微服务队列中看一下是否有任务要插队进行,在这个例子中没有,所以才接连执行了“发送消息1”、“发送消息2”、“发送消息3”。
2、Future
Future 表示事件队列中一个事件的结果,通常异步函数返回的对象就是一个 Future。当一个 Future 执行完后,他里面的值就可以使用了,可以通过 then() 在 Future 完成时执行其他代码:
void main() {
// readAsString() 返回 Future<String>
File(r"D:\a1.txt").readAsString().then((value) => print(value));
}
2.1 异常处理
当给到一个不存在的文件地址时会发生异常,这时候可以利用 catchError 捕获此异常:
// then().catchError() 模式就是异步的 try-catch
void main() {
File(r"D:\a2.txt")
.readAsString()
.then((value) => print(value))
.catchError((e, s) {
print(e);
});
}
会打印输入如下信息:
PathNotFoundException: Cannot open file, path = 'D:\a2.txt' (OS Error: 系统找不到指定的文件。
, errno = 2)
2.2 组合
then()
的返回值同样是一个 Future 对象,可以利用队列的原理进行组合异步任务:
void main() {
File(r"D:\a1.txt").readAsString().then((value) {
print(value);
// 1 被转化为 Future<int> 类型返回
return 1;
}).then((value) => print(value));
}
上面是等待执行完成读取文件之后,再执行一个新的 Future。如果我们需要等待一组任务都执行完再统一处理一些事情,可以通过wait()
完成:
var readFuture = File(r"D:\a1.txt").readAsString();
var delayedFuture = Future.delayed(const Duration(seconds: 3));
Future.wait([readFuture, delayedFuture]).then((value) {
print(value[0]); // 第一个 Future 的结果,即文件中的字符串
print(value[1]); // 第二个 Future 的结果,null
});
3、Stream
Stream,也就是流,表示发出的一系列的异步数据。Stream 是一个异步数据源,它是 Dart 中处理异步事件流的统一 API。
3.1 基本使用
Future 表示稍后获得的一个数据,所有异步的操作的返回值都用 Future 来表示。但是 Future 只能表示一次异步获得的数据。而 Stream 表示多次异步获得的数据。比如 IO 处理的时候,每次只会读取一部分数据和一次性读取整个文件的内容相比,Stream 的好处是处理过程中内存占用较小。而 File 的 readAsString()
是一次性读取整个文件的内容进来,虽然获得完整内容处理起来比较方便,但是如果文件很大的话就会导致内存占用过大的问题。
new File("/Users/enjoy/app-release.apk").openRead().listen((List<int> bytes) {
print("stream执行"); // 执行多次
});
new File("/Users/enjoy/app-release.apk").readAsBytes().then((_){
print("future执行"); // 执行1次
});
以读取文件内容为例,如果文件太大不足以一次读取完,Stream 就会分多次读取,但是 Future 还是会一次读取完整个文件的内容。
Stream 的 listen() 其实就是订阅这个 Stream,它会返回一个 StreamSubscription,即订阅者。订阅者可以通过 cancel() 取消订阅,通过 onData() 重置 listen(),还有其他可调用方法如下所示:
var streamSubscription =
File(r"D:\a1.txt").openRead().listen((List<int> bytes) {
print("Stream 执行");
});
// 重置 listen 方法
streamSubscription.onData((_) {
print("替代 listen");
});
// 监听结束
streamSubscription.onDone(() {
print("结束");
});
// 发生异常
streamSubscription.onError((e,s){
print("异常");
});
// 暂停,如果没有继续则会退出程序
streamSubscription.pause();
// 恢复
streamSubscription.resume();
// 取消监听
streamSubscription.cancel();
3.2 广播模式
Stream 有单订阅和多订阅两种模式,默认是单订阅,可以通过 Stream.asBroadcastStream() 将单订阅变为多订阅:
var stream = new File("/Users/enjoy/app-release.apk").openRead();
stream.listen((List<int> bytes) {});
// 错误 单订阅只能有一个订阅者
// stream.listen((_){
// print("stream执行");
// });
var broadcastStream = new File("/Users/enjoy/app-release.apk").openRead().asBroadcastStream();
broadcastStream.listen((_){
print("订阅者1");
});
broadcastStream.listen((_){
print("订阅者2");
});
可以通过 isBroadcast 属性判断当前 Stream 所处的模式。
除了使用 Stream.asBroadcastStream() 将已经存在的 Stream 由单订阅变为多订阅之外,也可以使用 StreamController.broadcast() 直接创建一个多订阅的 Stream,只不过这样创建的 Stream,如果不及时添加订阅者可能会丢失数据:
void test() {
// 1.由单订阅转换为多订阅的 Stream 具有粘性
var stream = Stream.fromIterable([1, 2, 3]);
Timer(Duration(seconds: 3), () => stream.listen(print));
// 2.通过 StreamController 创建的 Stream 没有粘性
var streamController = StreamController.broadcast();
streamController.add(1);
// 先发出事件再订阅,无法接到通知
streamController.stream.listen((event) {
print(event);
});
// 关闭
streamController.close();
}
输出结果为 1 2 3
,也就是在 stream 上先发送消息后订阅是可以收到消息的(由单订阅转为多订阅的 Stream 本质上还是单订阅的),但是对 streamController 的 stream 就不行。
4、async/await
当我们需要获得 A 的结果,再执行 B 时,你需要then()->then()
,利用async
与await
能够非常好的解决回调地狱的问题。比如说,读取文件的操作一般要异步执行,但是读取多个文件时有先后顺序,那么就可以将读取操作先放入异步方法中,然后在方法内,对每个读取文件的操作都加上 await 变为同步操作:
/// async 表示这是一个异步方法,await 必须在 async方法中使用
/// 异步方法只能返回 void 或 Future
Future<String> readFile() async {
// await 等待 Future 执行完成再执行后续代码,即阻塞
String content = await File("/Users/a.txt").readAsString();
String content2 = await File("/Users/b.txt").readAsString();
// 自动转换为 future
return "$content$content2";
}
简言之,async 与 await 搭配使用可以将异步变为同步,简化操作(避免回调地狱)。