Flutter中stream学习
Flutter中stream学习
- 概述
- Stream的基础概念
- stream的常用方法
- Stream.fromFuture(Future<T> future)
- Stream.fromFutures(Iterable<Future<T>> futures)
- Stream.fromIterable(Iterable<T> elements)
- Stream.periodic(Duration period, [T computation(int computationCount)?])
- Stream<T> take(int count)
- Stream<T> takeWhile(bool test(T element))
- Stream<T> where(bool test(T event))
- Stream<T> distinct([bool equals(T previous, T next)])
- Stream<T> skip(int count)
- Stream<T> skipWhile(bool test(T element))
- Stream<S> map<S>(S convert(T event))
- Stream<S> expand<S>(Iterable<S> convert(T element))
- Stream的分类
- 单订阅
- 广播订阅
概述
Stream 主要应用于 Flutter 的异步操作,在其他编程语言中也存在;Stream 提供了一种接受事件队列的方法,可通过 listen 进行数据监听,通过 error 接收失败状态,通过 done 来接收结束状态;
Stream的基础概念
Stream
:表示一个可以接收异步事件的数据源。可以生成一个或多个值。StreamController
:控制Stream,可以向其添加事件、错误以及关闭它。StreamSubscription
:表示对Stream的监听,可以用来取消订阅。Sink
:用来向Stream添加数据、错误、以及关闭。
stream的常用方法
Stream.fromFuture(Future future)
Stream
通过Future
对象创建新的单订阅流, 当Future
对象完成时会触发 data / error,
然后已done
事件结束
Future<String> getDate() async {
await Future.delayed(const Duration(seconds: 3));
return "当前时间为${DateTime.now()}";
}
void testStreamFromFuture() {
Stream.fromFuture(getDate()).listen((event) {
print("testStreamFromFuture============$event");
}).onDone(() {
print("testStreamFromFuture==========done 结束");
});
}
输出结果:
Stream.fromFutures(Iterable<Future> futures)
Stream 通过一系列的 Future 创建新的单订阅流,每个 Future 都会有自身的 data / error 事件, 当这一系列的 Future 均完成时,Stream 以 done 事件结束;若 Futures 为空,则 Stream 会立刻关闭;其分析源码,很直接的看到是将每一个 Future 事件监听完之后才会执行的微事件结束;
源码代码:
factory Stream.fromFutures(Iterable<Future<T>> futures) {
_StreamController<T> controller =
new _SyncStreamController<T>(null, null, null, null);
int count = 0;
// Declare these as variables holding closures instead of as
// function declarations.
// This avoids creating a new closure from the functions for each future.
void onValue(T value) {
if (!controller.isClosed) {
controller._add(value);
if (--count == 0) controller._closeUnchecked();
}
}
void onError(Object error, StackTrace stack) {
if (!controller.isClosed) {
controller._addError(error, stack);
if (--count == 0) controller._closeUnchecked();
}
}
// The futures are already running, so start listening to them immediately
// (instead of waiting for the stream to be listened on).
// If we wait, we might not catch errors in the futures in time.
for (var future in futures) {
count++;
future.then(onValue, onError: onError);
}
// Use schedule microtask since controller is sync.
if (count == 0) scheduleMicrotask(controller.close);
return controller.stream;
}
示例代码:
var datas = [getDate(), getDate(), getDate()];
Stream.fromFutures(datas).listen((event) {
print("testStreamFromFutures============$event");
}).onDone(() {
print("testStreamFromFutures==========done 结束");
});
输出结果:
Stream.fromIterable(Iterable elements)
Stream 通过数据集合中获取并创建单订阅流,通过 listen 监听迭代器中每一个子 element,当 Stream 监听到取消订阅或 Iterator.moveNext 返回 false / throw 异常 时停止迭代;
void testStreamFromIterable() {
var datas = [1, 2, "5.toStroing", false, 9];
Stream.fromIterable(datas).listen((event) {
print("testStreamFromIterable============$event");
}).onDone(() {
print("testStreamFromIterable==========done 结束");
});
}
输出结果:
Stream.periodic(Duration period, [T computation(int computationCount)?])
Stream 通过 Duration 对象作为参数创建一个周期性事件流,其中若不设置 computation 时 onData 获取数据为 null;若没有事件结束则会一直周期性执行; 因为 computation 函数是返回流的结果
void testStreamPeriodic() {
Stream.periodic(const Duration(seconds: 1)).listen((event) {
print("testStreamPeriodic===没有computation==================$event");
});
Stream.periodic(const Duration(seconds: 1), (x) => x).listen((event) {
print("testStreamPeriodic---- listen========$event");
}).onDone(() {
print("testStreamPeriodic==========done 结束");
});
}
输出结果:
Stream take(int count)
take() 对于单订阅方式,可以提供 take 设置之前的 Stream 订阅数据,例如设置中断 Stream.periodic 周期展示次数;小菜粗略理解为 take 可以作为中断订阅, 如果 take 设置次数大于 onDone 之前的订阅数据次数,Stream 依旧获取所有 onDone 之前的订阅数据
void testStreamTake() {
Stream.periodic(const Duration(seconds: 1), (x) => x)
.take(5) // 如果不设置这个, 这个流将一直会执行, 但是设置之后只会执行设置的数的次数
.listen((event) {
print("testStreamTake===========$event");
}).onDone(() {
print("testStreamTake==============done 结束");
});
}
输出结果:
Stream takeWhile(bool test(T element))
takeWhile
也可以实现上述take
方法相同效果, 返回一个 boolean 类型,如果为 false 则中断订阅
void testStreamTakeWhile() {
Duration interval = const Duration(seconds: 1);
Stream<int> streamData = Stream<int>.periodic(interval, (data) => data);
streamData.takeWhile((element) {
print('Stream.periodic.takeWhile -> $element');
return element < 5;
}).listen((event) {
print('Stream.periodic -> $event');
}).onDone(() {
print('Stream.periodic -> done 结束');
});
}
输出结果:
Stream where(bool test(T event))
where 用于在当前 Stream 中创建一个新的 Stream 用来丢弃不符合 test 的数据;简单理解为类似数据库查询一样,仅过滤符合需求的数据流;且 where 可以设置多次
void testStreamWhere() {
Stream.periodic(const Duration(seconds: 1), (data) => data)
.takeWhile((element) => element <= 5)
.where((event) {
print('Stream.periodic.where -> $event');
return event > 3;
}).listen((event) {
print("testStreamWhere==================$event");
}).onDone(() {
print("testStreamWhere===================== done 结束");
});
}
输出结果:
Stream distinct([bool equals(T previous, T next)])
作用:相邻的两个数据去重哈
void testStreamDistinct() {
var datas = [1, 2, '3.toString()', true, true, false, true, 6];
Stream.fromIterable(datas).distinct().listen((event) {
print("testStreamDistinct===========================$event");
}).onDone(() {
print('testStreamDistinct============================ done 结束');
});
}
输出结果:
Stream skip(int count)
作用: skip 用于跳过符合条件的订阅数据次数 count: 跳过的次数;
void testStreamSkip() {
Stream<int> streamData =
Stream<int>.periodic(const Duration(seconds: 1), (data) => data + 1);
streamData
.takeWhile((element) {
print('Stream.periodic.takeWhile -> $element');
return element <= 6;
})
.where((event) {
print('Stream.periodic.where -> $event');
return event > 2;
})
.skip(2)
.listen((event) {
print('Stream.periodic -> $event');
})
.onDone(() {
print('Stream.periodic -> done 结束');
});
}
输出结果 :
Stream skipWhile(bool test(T element))
skipWhile 用于跳过在 where 符合条件下满足设置 条件的订阅数据;即当 返回 为 true 时跳过当前订阅数据监听;
void testSkipWhile() {
Stream.periodic(const Duration(seconds: 1), (data) => data)
.takeWhile((element) => element < 5)
.skipWhile((element) => element < 3)
.listen((event) {
print("testSkipWhile=========$event");
}).onDone(() {
print("testSkipWhile========done 结束");
});
}
输出 结果:
Stream map(S convert(T event))
在当前 Stream 基础上创建一个新的 Stream 并对当前 Stream 进行数据操作,onData 监听到的是 map 变更后的新的数据流;
void testStreamMap() {
// 创还能一个stream刘
Stream<int> streamData =
Stream<int>.periodic(const Duration(seconds: 1), (data) => data + 1);
streamData.takeWhile((element) {
print('Stream.periodic.takeWhile -> $element');
return element < 5;
}).map((event) {
print('Stream.periodic.map -> $event -> ${event * 100}');
return event * 100;
}).listen((event) {
print('Stream.periodic -> $event');
}).onDone(() {
print('Stream.periodic -> done 结束');
});
}
输出结果:
Stream expand(Iterable convert(T element))
在当前 Stream 基础上创建新的 Stream 并将当前订阅数据转为新的订阅数据组,onData 监听 数据组 中每个新的订阅数据元素;
void testStreamExpand() {
Stream<int> streamData =
Stream<int>.periodic(const Duration(seconds: 1), (data) => data + 1);
streamData.takeWhile((element) {
print('Stream.periodic.takeWhile -> $element');
return element <= 6;
}).expand((element) {
print(
'Stream.periodic.expand -> $element -> ${element * 10} -> ${element * 100}');
return [element, element * 10, element * 100];
}).listen((event) {
print('Stream.periodic -> $event');
}).onDone(() {
print('Stream.periodic -> done 结束');
});
}
输出结果:
Stream的分类
单订阅
默认情况下Streams会被设置成单订阅,点订阅会保持当前的值,直到有其它的订阅。
单订阅Stream(Single-Subscription Stream)一次只能有一个监听器(listener),当我们对单订阅进行监听的时候,程序会被错。通常用于一次性事件
void testStreamController() {
// 使用streamController创建一个stream
final streamController = StreamController<int>();
// 获取stream流
final stream = streamController.stream;
// 监听stream
stream.listen((event) {
print("testStreamController========================$event");
});
// stream.listen((event) {
// print("testStreamController=11111=======================$event");
// });
// 添加测试数据到stream
streamController.sink.add(1);
streamController.sink.add(2);
streamController.sink.add(3);
// 关闭stream流
streamController.close();
}
如果我打开上述注释掉的监听, 对一个单订阅的stream进行多次监听会报如下错误:
广播订阅
广播(Broadcast Stream)允许多个监听器,可以同时向多个订阅者推送数据。 这种类型适合用于事件广播,比如用户操作、全局数据推送等。
void testStreamBoardcast() {
final streamController = StreamController<int>.broadcast();
final stream = streamController.stream;
stream.listen((event) {
print("testStreamBoardcast==============$event");
});
stream.listen((event) {
print("testStreamBoardcast111111111===================$event");
});
streamController.sink.add(1);
streamController.sink.add(2);
streamController.sink.add(3);
streamController.close();
}
输出结果:
除此之外Flutter官方还提供了StreamBuilder
这种专门用于监听Stream
并根据数据变化更新UI的Widget
。具体用法可以参考官方文档。