当前位置: 首页 > article >正文

Kotlin中RxJava用法

RxJava 是一个基于观察者模式的响应式编程库,广泛用于处理异步事件流。Kotlin 与 RxJava 结合使用可以简化异步编程和事件处理。以下是一些常见的 RxJava 用法示例。
配置文件中引用:

 implementation("io.reactivex.rxjava3:rxjava:3.0.2");
    implementation("io.reactivex.rxjava3:rxandroid:3.0.2") // 如果你在 Android 项目中使用

各种用法代码如下:

fun main() {
    //1.各种创建
     Observable.create<Int> { emitter ->
            for (i in 1..10) {
                emitter.onNext(i)
            }
            emitter.onComplete()

    }.subscribe(
         { item -> println(item) },
          { error -> println(error) },
         { println("complete") }
    )//打印:1 2 3 4 5 6 7 8 9 10 complete 共11行

    //2.just创建
    val observable = Observable.just(1, 2, 3, 4, 5)
    observable.subscribe { println(it) } //打印:1 2 3 4 5 共5行
    //3.fromArray创建
    val observable2 = Observable.fromArray(1, 2, 3, 4, 5)
    observable2.subscribe { println(it) } //打印:1 2 3 4 5 共5行
    //4.range创建
    val observable3 = Observable.range(1, 5)
    observable3.subscribe { println(it) } //打印:1 2 3 4 5 共5行
    //5.interval创建
    val observable4 = Observable.interval(1, TimeUnit.SECONDS)
    observable4.subscribe { println(it) } //每隔1秒打印一次数字
    //6.timer创建
    val observable5 = Observable.timer(1, TimeUnit.SECONDS)
    observable5.subscribe { println(it) } //1秒后打印数字0
    //7.error创建
    val observable6 = Observable.error<Throwable>(RuntimeException("error"))
    observable6.subscribe ({item->println(item)},{e -> println(e)} ) //打印:java.lang.RuntimeException: error
    //8.fromIterable创建
    val list = listOf(1, 2, 3, 4, 5)
    val observable7 = Observable.fromIterable(list)
    observable7.subscribe { println(it) } //打印:1 2 3 4 5 共5行

  //  二.操作符
    //1.map 将发射的数据进行转换。
    val observable8 = Observable.just(1, 2, 3, 4, 5)
    observable8.map { it * 2 }.subscribe { println(it) } //打印:2 4 6 8 10 共5行
    //2.flatMap  将发射的数据进行转换,并且将转换后的数据合并后发射。
    val observable9 = Observable.just(1, 2, 3, 4, 5)
    observable9.flatMap { Observable.just(it * 2) }.subscribe { println(it) } //打印:2 4 6 8 10 共5行
    //3.concatMap 将发射的数据进行转换,并且将转换后的数据合并后发射,但是和flatMap不同的是,concatMap会按照发射的顺序来发射数据。
    val observable10 = Observable.just(1, 2, 3, 4, 5)
    observable10.concatMap { Observable.just(it * 2) }.subscribe { println(it) } //打印:2 4 6 8 10 共5行
    //4.concat 将多个Observable发射的数据按照顺序合并后发射。
    val observable11 = Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
    observable11.subscribe { println(it) } //打印:1 2 3 4 5 6 共6行
    //5.merge 将多个Observable发射的数据合并后发射。
    val observable12 = Observable.merge(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
    observable12.subscribe { println(it) } //打印:1 2 3 4 5 6 共6行
    //6.zip 将多个Observable发射的数据按照顺序合并后发射,并且合并后的数据数量为发射的数据数量最少的Observable。
    val observable13 = Observable.zip(Observable.just(1, 2, 3), Observable.just(4, 5, 6,7)){ a, b -> a + b }
    observable13.subscribe { println(it) } //打印:5 7 9 共3行
    //7.filter 过滤发射的数据,只有满足条件的数据才会被发射。
    val observable14 = Observable.just(1, 2, 3, 4, 5)
    observable14.filter { it % 2 == 0 }.subscribe { println(it) } //打印:2 4 共2行
    //8.take 只发射前n个数据。
    val observable15 = Observable.just(1, 2, 3, 4, 5)
    observable15.take(3).subscribe { println(it) } //打印:1 2 3 共3行
    //9.skip 跳过前n个数据。
    val observable16 = Observable.just(1, 2, 3, 4, 5)
    observable16.skip(3).subscribe { println(it) } //打印:4 5 共2行
    //10.takeWhile 只发射满足条件的数据。
    val observable17 = Observable.just(1, 2, 3, 4, 5)
    observable17.takeWhile { it < 4 }.subscribe { println(it) } //打印:1 2 3 共3行
    //11.skipWhile 跳过满足条件的数据。
    val observable18 = Observable.just(1, 2, 3, 4, 5)
    observable18.skipWhile { it < 4 }.subscribe { println(it) } //打印:4 5 共2行
    //12.takeUntil 只发射不满足条件的数据。
    val observable19 = Observable.just(1, 2, 3, 4, 5)
    observable19.takeUntil { it == 4 }.subscribe { println(it) } //打印:1 2 3 共3行
    //13.takeLast 只发射最后n个数据。
    val observable21 = Observable.just(1, 2, 3, 4, 5)
    observable21.takeLast(3).subscribe { println(it) } //打印:3 4 5 共3行
    //14.distinct 去重,只发射第一次出现的数据。
    val observable22 = Observable.just(1, 2, 3, 4, 5, 1, 2, 3)
    observable22.distinct().subscribe { println(it) } //打印:1 2 3 4 5 共5行
    // 三.线程调度
    //1.subscribeOn 指定Observable执行的线程。
    val observable23 = Observable.just(1, 2, 3, 4, 5)
    observable23.subscribeOn(Schedulers.io()).subscribe { println(it) } //打印:1 2 3 4 5 共5行
    //2.observeOn 指定Observer执行的线程。
    val observable24 = Observable.just(1, 2, 3, 4, 5)
    observable24.observeOn(Schedulers.computation()).subscribe { println(it) } //打印:1 2 3 4 5 共5行

    // 四.错误处理
    Observable.error<Throwable>(RuntimeException("Error occurred"))
        .subscribe(
            { println("OnNext: $it") },
            { println("OnError: ${it.message}") },
            { println("OnComplete") }
        ) //打印:OnError: Error occurred



}

五.处理生命周期

class MyActivity : AppCompatActivity() {

    private val compositeDisposable = CompositeDisposable()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        val disposable = Observable.just(1, 2, 3)
            .subscribe { println(it) }

        compositeDisposable.add(disposable)
    }

    override fun onDestroy() {
        super.onDestroy()
        compositeDisposable.clear() // 取消所有订阅
    }
}

http://www.kler.cn/a/569967.html

相关文章:

  • 测试工程师的DeepSeek提效2:自动化测试应用
  • android TabLayout设置tab的时候文字默认居中,选中文字加粗
  • 微信小程序接入DeepSeek模型(火山方舟),并在视图中流式输出
  • PostgreSQL全页写机制深度解析:如何平衡WAL性能与数据可靠性
  • IDEA Tab 页设置多行显示
  • Docker + Vue2 热重载:为什么需要 CHOKIDAR_USEPOLLING=true?
  • 23种设计模式之工厂方法模式(Factory Method Pattern)【设计模式】
  • 频谱泄露与加窗
  • 代码规范和简化标准
  • Java零基础入门笔记:(6)面向对象
  • Cherno C++ P60 为什么不用using namespace std
  • react 19版中路由react-router-dom v7版的使用
  • MySQL学习笔记(2)并发问题与事务隔离级别
  • kettle插件-git/svn版本管理插件
  • 实战 Elasticsearch:快速上手与深度实践-2.2.2线程池配置与写入限流
  • 乡村研学旅行小程序(论文源码调试讲解)
  • 【JavaScript】《JavaScript高级程序设计 (第4版) 》笔记-附录A-ES2018 和 ES2019
  • 芯麦GC1272与茂达APX9172驱动芯片技术对比及替代方案解析 ——以电脑散热风扇、工业风机及智能设备场景为例
  • Kaldi环境配置与Aishell训练
  • 解决:org.springframework.web.multipart.support.MissingServletRequestPartException