Android开源库——RxJava和RxAndroid
RxJava和RxAndroid是什么?
RxJava是基于JVM的响应式扩展,用于编写异步代码
RxAndroid是关于Android的RxJava绑定
RxJava和RxAndroid使用
依赖
implementation 'io.reactivex.rxjava3:rxjava:3.1.0'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
使用过程
如下模拟在子线程中进行耗时操作,并将结果返回到主线程中处理
- Flowable:将要进行的操作
- subscribeOn():操作要运行的线程
- observeOn() :处理结果要运行的线程
- subscribe():处理结果
Flowable.fromCallable(() -> {
Thread.sleep(1000);
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(System.out::println, Throwable::printStackTrace);
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000);
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(AndroidSchedulers.mainThread());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
RxJava源码解析
Publisher
Publisher用于发布数据,Subscriber通过subscribe()订阅数据
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Subscriber
Subscriber接收Publisher发布的数据
- onSubscribe():subscribe()回调函数,回调前会创建Subscription用于控制数据发布和停止
- onNext():当Subscription调用request()时会调用onNext()发布数据
- onError():处理接收到的错误
- onComplete():处理完成的情况
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscription
Subscription表示Publisher和Subscriber的对应关系
- request():向Publisher请求数据
- cancel():让Publisher停止发布数据
public interface Subscription {
public void request(long n);
public void cancel();
}
Scheduler
createWorker()用于创建Worker ,具体的调度工作由Worker的schedule()完成
public abstract class Scheduler {
public abstract Worker createWorker();
public abstract static class Worker implements Disposable {
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
}
}
source传递过程
fromCallable()创建FlowableFromCallable,传递callable
public abstract class Flowable<@NonNull T> implements Publisher<T> {
public static <@NonNull T> Flowable<T> fromCallable(@NonNull Callable<? extends T> callable) {
return RxJavaPlugins.onAssembly(new FlowableFromCallable<>(callable));
}
}
subscribeOn()创建FlowableSubscribeOn,传递this(即FlowableFromCallable)作为source
public abstract class Flowable<@NonNull T> implements Publisher<T> {
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return subscribeOn(scheduler, !(this instanceof FlowableCreate));
}
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<>(this, scheduler, requestOn));
}
}
observeOn()创建FlowableObserveOn,传递this(即FlowableSubscribeOn)作为source
public abstract class Flowable<@NonNull T> implements Publisher<T> {
public final Flowable<T> observeOn(@NonNull Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Flowable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new FlowableObserveOn<>(this, scheduler, delayError, bufferSize));
}
}
即依次将自身当作Flowable,作为参数source传递给下一个Flowable
subscribe()流程
subscribe()最终调用具体Flowable的subscribeActual()
public abstract class Flowable<@NonNull T> implements Publisher<T> {
......
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError) {
return subscribe(onNext, onError, Functions.EMPTY_ACTION);
}
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
@NonNull Action onComplete) {
.....
LambdaSubscriber<T> ls = new LambdaSubscriber<>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
subscribe(ls);
return ls;
}
public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
try {
Subscriber<? super T> flowableSubscriber = RxJavaPlugins.onSubscribe(this, subscriber);
......
subscribeActual(flowableSubscriber);
}......
}
protected abstract void subscribeActual(@NonNull Subscriber<? super T> subscriber);
}
调用过程和传递过程是相反的,先调用FlowableObserveOn的subscribeActual()
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
......
@Override
public void subscribeActual(Subscriber<? super T> s) {
Worker worker = scheduler.createWorker();
if (s instanceof ConditionalSubscriber) {
.....
} else {
source.subscribe(new ObserveOnSubscriber<>(s, worker, delayError, prefetch));
}
}
}
上面的source就是上一层传递下来的FlowableSubscribeOn,即调用到FlowableSubscribeOn的subscribeActual()
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
......
@Override
public void subscribeActual(final Subscriber<? super T> s) {
Scheduler.Worker w = scheduler.createWorker();
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests);
.....
w.schedule(sos);
}
static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
implements FlowableSubscriber<T>, Subscription, Runnable {
......
@Override
public void run() {
lazySet(Thread.currentThread());
Publisher<T> src = source;
source = null;
src.subscribe(this);
}
schedule()最终会调用run()方法,lazySet()切换线程,上面的source就是上一层传递下来的FlowableFromCallable,即将到FlowableFromCallable的subscribeActual()放到指定线程中运行
public final class FlowableFromCallable<T> extends Flowable<T> implements Supplier<T> {
final Callable<? extends T> callable;
......
@Override
public void subscribeActual(Subscriber<? super T> s) {
DeferredScalarSubscription<T> deferred = new DeferredScalarSubscription<>(s);
s.onSubscribe(deferred);
T t;
try {
t = Objects.requireNonNull(callable.call(), "The callable returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (deferred.isCancelled()) {
RxJavaPlugins.onError(ex);
} else {
s.onError(ex);
}
return;
}
deferred.complete(t);
}
......
}
上面若出错回调onError(),否则调用downstream的onNext()传递结果
public class DeferredScalarSubscription<@NonNull T> extends BasicIntQueueSubscription<T> {
public final void complete(T v) {
int state = get();
for (;;) {
......
if (state == HAS_REQUEST_NO_VALUE) {
lazySet(HAS_REQUEST_HAS_VALUE);
Subscriber<? super T> a = downstream;
a.onNext(v);
if (get() != CANCELLED) {
a.onComplete();
}
return;
}
value = v;
......
}
}
}
onNext()过程
调用FlowableSubscribeOn.SubscribeOnSubscriber的onNext(),调用downstream的onNext()传递结果
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
implements FlowableSubscriber<T>, Subscription, Runnable {
@Override
public void onNext(T t) {
downstream.onNext(t);
}
}
}
调用FlowableObserveOn.BaseObserveOnSubscriber的onNext()、trySchedule(),schedule()最终会调用run()方法,根据sourceMode判断是同步还是异步
- FlowableObserveOn.ObserveOnSubscriber的runSync()和runAsync()都调用downstream的onNext()传递结果
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
.....
abstract static class BaseObserveOnSubscriber<T>
extends BasicIntQueueSubscription<T>
implements FlowableSubscriber<T>, Runnable {
@Override
public final void onNext(T t) {
......
trySchedule();
}'
final void trySchedule() {
......
worker.schedule(this);
}
@Override
public final void run() {
if (outputFused) {
runBackfused();
} else if (sourceMode == SYNC) {
runSync();
} else {
runAsync();
}
}
}
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
implements FlowableSubscriber<T> {
void runSync() {
......
final Subscriber<? super T> a = downstream;
......
for (;;) {
......
while (e != r) {
......
a.onNext(v);
......
}......
}
}
.....
@Override
void runAsync() {
final Subscriber<? super T> a = downstream;
for (;;) {
......
while (e != r) {
.....
a.onNext(v);
.....
}
.....
}
}
}
}
调用LambdaSubscriber的onNext(),通过传入的Consumer消费掉最终的结果,即通过System.out::println打印出来
public final class LambdaSubscriber<T> extends AtomicReference<Subscription>
implements FlowableSubscriber<T>, Subscription, Disposable, LambdaConsumerIntrospection {
.....
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().cancel();
onError(e);
}
}
}
}
onSubscribe()和request()流程
FlowableFromCallable回调下一层的onSubscribe(),其将Subscription存到upstream
public final class FlowableFromCallable<T> extends Flowable<T> implements Supplier<T> {
final Callable<? extends T> callable;
@Override
public void subscribeActual(Subscriber<? super T> s) {
DeferredScalarSubscription<T> deferred = new DeferredScalarSubscription<>(s);
s.onSubscribe(deferred);
......
}
}
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
implements FlowableSubscriber<T>, Subscription, Runnable {
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this.upstream, s)) {
......
}
}
}
}
FlowableSubscribeOn回调下一层的onSubscribe(),其回调下一层的onSubscribe()和上一层的request()请求数据
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
......
@Override
public void subscribeActual(final Subscriber<? super T> s) {
Scheduler.Worker w = scheduler.createWorker();
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests);
s.onSubscribe(sos);
}
}
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
implements FlowableSubscriber<T> {
......
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
......
queue = new SpscArrayQueue<>(prefetch);
downstream.onSubscribe(this);
s.request(prefetch);
}
}
}
}
LambdaSubscriber利用FlowableInternalHelper.RequestMax的accept()调用上一层的request(),从schedule()获取数据
public final class LambdaSubscriber<T> extends AtomicReference<Subscription>
implements FlowableSubscriber<T>, Subscription, Disposable, LambdaConsumerIntrospection {
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this, s)) {
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
......
}
}
}
}
public final class FlowableInternalHelper {
public enum RequestMax implements Consumer<Subscription> {
INSTANCE;
@Override
public void accept(Subscription t) {
t.request(Long.MAX_VALUE);
}
}
}
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
abstract static class BaseObserveOnSubscriber<T>
extends BasicIntQueueSubscription<T>
implements FlowableSubscriber<T>, Runnable {
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
trySchedule();
}
}
final void trySchedule() {
......
worker.schedule(this);
}
}
}
FlowableSubscribeOn.SubscribeOnSubscriber的request()、requestUpstream()判断当前线程,若未切换线程调用schedule()切换线程调用上一层的request()
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
implements FlowableSubscriber<T>, Subscription, Runnable {
@Override
public void request(final long n) {
if (SubscriptionHelper.validate(n)) {
Subscription s = this.upstream.get();
if (s != null) {
requestUpstream(n, s);
} else {
......
}
}
}
}
void requestUpstream(final long n, final Subscription s) {
if (nonScheduledRequests || Thread.currentThread() == get()) {
s.request(n);
} else {
worker.schedule(new Request(s, n));
}
}
static final class Request implements Runnable {
......
@Override
public void run() {
upstream.request(n);
}
}
}
}
DeferredScalarSubscription接收到请求后,将值传给downstream的onNext()
public class DeferredScalarSubscription<@NonNull T> extends BasicIntQueueSubscription<T> {
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
for (;;) {
int state = get();
......
if (state == NO_REQUEST_HAS_VALUE) {
if (compareAndSet(NO_REQUEST_HAS_VALUE, HAS_REQUEST_HAS_VALUE)) {
T v = value;
if (v != null) {
value = null;
Subscriber<? super T> a = downstream;
a.onNext(v);
if (get() != CANCELLED) {
a.onComplete();
}
}
}
return;
}
......
}
}
}
}
Schedulers.io()调度过程
Schedulers.io() = Schedulers.IO = IOTask() = IoHolder.DEFAULT = IoScheduler()
public final class Schedulers {
static final Scheduler IO;
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static {
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static final class IOTask implements Supplier<Scheduler> {
@Override
public Scheduler get() {
return IoHolder.DEFAULT;
}
}
}
FlowableSubscribeOn的subscribeActual()通过IoScheduler创建Worker并调用schedule()
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
......
@Override
public void subscribeActual(final Subscriber<? super T> s) {
Scheduler.Worker w = scheduler.createWorker();
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests);
.....
w.schedule(sos);
}
}
调用IoScheduler的createWorker()会返回EventLoopWorker
public final class IoScheduler extends Scheduler {
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
}
调用IoScheduler.EventLoopWorker的schedule()最终调用ThreadWorker的父类NewThreadWorker的scheduleActual()
public final class IoScheduler extends Scheduler {
static final class EventLoopWorker extends Scheduler.Worker implements Runnable {
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
......
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
static final class ThreadWorker extends NewThreadWorker {
......
}
}
调用scheduleActual()将Runnable封装成ScheduledRunnable,通过ScheduledThreadPoolExecutor的submit()或schedule()提交
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
......
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
......
}
return sr;
}
}
public final class SchedulerPoolFactory {
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
return exec;
}
}
线程池会调用FlowableSubscribeOn.SubscribeOnSubscriber的run()方法,SubscribeOnSubscriber继承了AtomicReference<Thread>,lazySet()切换线程调用上一层source的subscribe()
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
implements FlowableSubscriber<T>, Subscription, Runnable {
@Override
public void run() {
lazySet(Thread.currentThread());
Publisher<T> src = source;
source = null;
src.subscribe(this);
}
}
}
AndroidSchedulers.mainThread()调度过程
AndroidSchedulers.mainThread() = AndroidSchedulers.MAIN_THREAD = MainHolder.DEFAULT = HandlerScheduler(),通过主线程Looper创建handler
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = internalFrom(Looper.getMainLooper(), true);
}
private static final Scheduler MAIN_THREAD =
RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
}
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static Scheduler internalFrom(Looper looper, boolean async) {
......
return new HandlerScheduler(new Handler(looper), async);
}
}
FlowableObserveOn的subscribeActual()通过IoScheduler创建Worker,在onNext()的trySchedule()调用schedule()
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
@Override
public void subscribeActual(Subscriber<? super T> s) {
Worker worker = scheduler.createWorker();
if (s instanceof ConditionalSubscriber) {
......
} else {
source.subscribe(new ObserveOnSubscriber<>(s, worker, delayError, prefetch));
}
}
abstract static class BaseObserveOnSubscriber<T>
extends BasicIntQueueSubscription<T>
implements FlowableSubscriber<T>, Runnable {
@Override
public final void onNext(T t) {
......
trySchedule();
}
final void trySchedule() {
......
worker.schedule(this);
}
}
}
调用HandlerScheduler的createWorker()返回HandlerWorker()
final class HandlerScheduler extends Scheduler {
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
}
调用HandlerScheduler.HandlerWorker的schedule(),将Runnable封装成ScheduledRunnable,调用主线程handler的sendMessageDelayed()
final class HandlerScheduler extends Scheduler {
private static final class HandlerWorker extends Worker {
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
......
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this;
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
......
return scheduled;
}
}
}
最终主线程会调用FlowableObserveOn.BaseObserveOnSubscriber的run(),根据sourceMode判断是同步还是异步
- FlowableObserveOn.ObserveOnSubscriber的runSync()和runAsync()都调用downstream的onNext()传递结果
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
.....
abstract static class BaseObserveOnSubscriber<T>
extends BasicIntQueueSubscription<T>
implements FlowableSubscriber<T>, Runnable {
......
@Override
public final void run() {
if (outputFused) {
runBackfused();
} else if (sourceMode == SYNC) {
runSync();
} else {
runAsync();
}
}
}
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
implements FlowableSubscriber<T> {
void runSync() {
......
final Subscriber<? super T> a = downstream;
......
for (;;) {
......
while (e != r) {
......
a.onNext(v);
......
}......
}
}
.....
@Override
void runAsync() {
final Subscriber<? super T> a = downstream;
for (;;) {
......
while (e != r) {
.....
a.onNext(v);
.....
}
.....
}
}
}
}