Android Studio: RxJava如何取消订阅
一、事件流如何导致内存泄漏
上一篇提到过Observable.interval()是
定期发射事件,Activity
关闭后它还在运行,导致内存泄漏。
public class MyActivity extends AppCompatActivity {
private Subscription subscription;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
subscription = Observable.interval(1, TimeUnit.SECONDS) // 每秒发射一个事件
.subscribe(aLong -> Log.d("RxJava", "收到事件:" + aLong));
}
@Override
protected void onDestroy() {
super.onDestroy();
// ❌ 忘记取消订阅,Activity 可能会泄漏
}
}
Observable.interval()
是 RxJava 提供的一个工厂方法,用于创建一个发射定时事件的 Observable
。它的作用是每隔一定的时间间隔发射一个数字。类似于java中的定时任务。
subscribe()
是 Observable
的方法,它用于订阅事件流,并定义事件到达后的处理逻辑。通过 subscribe()
来订阅这个 Observable
,并定义了处理逻辑:每次收到事件时,输出日志,显示事件的数字。
在 Logcat
中,你会看到类似以下的输出:
收到事件:0
收到事件:1
收到事件:2
收到事件:3
事件会不断发射,直到你主动取消订阅或者应用程序退出。
如何单独对订阅进行取消:
单独对 subscription
进行取消订阅,可以在 onDestroy()
方法中手动调用 unsubscribe()。
public class MyActivity extends AppCompatActivity {
private Subscription subscription; // RxJava 1.x
// private Disposable disposable; // RxJava 2.x 及以上用 Disposable
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
subscription = Observable.interval(1, TimeUnit.SECONDS) // 每秒发射一个事件
.subscribe(aLong -> Log.d("RxJava", "收到事件:" + aLong));
}
@Override
protected void onDestroy() {
super.onDestroy();
if (subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe(); // 取消订阅,防止内存泄漏
}
}
}
二、订阅管理工具
虽然可以手动对Observable一个个取消订阅,但是
一个 Activity
可能会有多个 Observable
,手动管理多个 Subscription
代码复杂,而 CompositeSubscription
可以批量管理。
订阅管理工具:
下面的类确保可以在合适的时机取消订阅,避免内存泄漏。它相当于一个订阅管理工具,用于集中管理多个 Subscription
,方便在 Activity
或 Fragment
销毁时统一清理。
public abstract class ICompositeSubscription {
CompositeSubscription mCompositeSubscription;
public ICompositeSubscription(){
mCompositeSubscription = new CompositeSubscription();
}
/**
* 添加到订阅管理
* @param subscription
*/
public Subscription putSubscription(Subscription subscription){
mCompositeSubscription.add(subscription);
return subscription;
}
/**
* 取消订阅
*/
public void unSubscribe(){
if(null == mCompositeSubscription){
return;
}
mCompositeSubscription.clear();
}
}
CompositeSubscription
作用是管理多个 Subscription。它的内部是一个集合,可以存储多个 Subscription
,并且可以统一清理。
public Subscription putSubscription(Subscription subscription){
mCompositeSubscription.add(subscription);
return subscription;
}
- 传入一个
Subscription
,并把它添加到mCompositeSubscription
中进行管理。 - 这样,所有订阅都可以集中管理,方便后续取消。
public void unSubscribe(){
if(null == mCompositeSubscription){
return;
}
mCompositeSubscription.clear();
}
clear()
方法会取消所有的订阅,防止Observer
持续监听,导致内存泄漏。- 通常在
Activity
或Fragment
销毁时调用此方法,释放资源。
三、如何使用订阅管理工具
subscriptionManager.unSubscribe();
可以自动清理所有订阅,不会造成内存泄漏。
public class MyActivity extends AppCompatActivity {
private ICompositeSubscription subscriptionManager = new ICompositeSubscription();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS) // 每秒发射一个事件
.subscribe(aLong -> Log.d("RxJava", "收到事件:" + aLong));
// 添加到订阅管理
subscriptionManager.putSubscription(subscription);
}
@Override
protected void onDestroy() {
super.onDestroy();
// ✅ 取消所有订阅,避免内存泄漏
subscriptionManager.unSubscribe();
}
}
上面只有一个订阅的情况下,ICompositeSubscription
并没有明显的优势,直接手动调用 unsubscribe()
/ dispose()
就可以了。
📌 ICompositeSubscription
适用的场景
它的作用主要体现在多个订阅的管理上,比如:
- 多个 Observable 同时订阅
- 某些订阅需要随时添加和移除
- 避免手动管理多个 Subscription
假设现在有多个订阅:
public class MyActivity extends AppCompatActivity {
private ICompositeSubscription subscriptionManager = new ICompositeSubscription();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
// 订阅 1:定时任务
Subscription subscription1 = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(aLong -> Log.d("RxJava", "收到事件 1:" + aLong));
// 订阅 2:用户输入搜索事件
Subscription subscription2 = RxBus.toObservable(SearchEvent.class)
.subscribe(event -> Log.d("RxJava", "搜索事件:" + event.getQuery()));
// 订阅 3:网络请求
Subscription subscription3 = api.getUserInfo()
.subscribe(user -> Log.d("RxJava", "用户信息:" + user.getName()));
// 统一管理订阅
subscriptionManager.putSubscription(subscription1);
subscriptionManager.putSubscription(subscription2);
subscriptionManager.putSubscription(subscription3);
}
@Override
protected void onDestroy() {
super.onDestroy();
// 统一取消所有订阅,防止内存泄漏
subscriptionManager.unSubscribe();
}
}
ICompositeSubscription
适用于管理多个订阅,如果只有一个订阅,直接手动管理就足够了!