浅析Android Handler机制实现原理
0. 背景描述
Android系统出于对简单、高效的考虑,在设计UI
体系时采用了单线程模型,即不会在多个线程中对同一个UI
界面执行操作。简单是指单线程模型可以不用考虑和处理在多线程环境下操作UI
带来的线程安全问题,高效是指单线程模型下无需通过额外的手段处理多线程安全问题(一般通过加锁等方式解决线程安全问题,而使用锁进行同步就会存在性能开销)。同时,因为Android应用属于UI
交互型应用,需要对用户的操作进行快速响应,因此在处理UI
操作的线程中不能执行网络请求、本地IO
等这一类耗时操作,否则会导致用户的操作不能及时被响应处理,给用户带来不好的使用体验。
为了在单线程模型下保证用户交互体验的同时,Android应用可以处理必要耗时任务,Android系统引入Handler
机制来解决不能在主线程执行耗时操作的问题。
上面提到Android系统对UI
操作存在线程方面的限制,这一点可以从源码中找到相关的逻辑,下面是相关的源码:
public final class ActivityThread extends ClientTransactionHandler implements ActivityThreadInternal {
// ...
@UnsupportedAppUsage
final H mH = new H(); // 主线程创建的Handler
// ...
public static void main(String[] args) {
// ...
// 创建主线程的Looper
Looper.prepareMainLooper();
// ...
// 在主线程创建ActivityThread实例
ActivityThread thread = new ActivityThread();
thread.attach(false, startSeq);
// ...
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedly exited");
}
}
public final class ViewRootImpl implements ViewParent, View.AttachInfo.Callbacks, ThreadedRenderer.DrawCallbacks, AttachedSurfaceControl {
public ViewRootImpl(@UiContext Context context, Display display, IWindowSession session, boolean useSfChoreographer) {
// ...
mThread = Thread.currentThread();
// ...
}
@Override
public void requestLayout() {
if (!mHandlingLayoutInLayoutRequest) {
checkThread();
mLayoutRequested = true;
scheduleTraversals();
}
}
@Override
public void invalidateChild(View child, Rect dirty) {
invalidateChildInParent(null, dirty);
}
@Override
public ViewParent invalidateChildInParent(int[] location, Rect dirty) {
checkThread();
// ...
}
void checkThread() {
if (mThread != Thread.currentThread()) {
throw new CalledFromWrongThreadException("Only the original thread that created a view hierarchy can touch its views.");
}
}
}
@UiThread
public class View implements Drawable.Callback, KeyEvent.Callback, AccessibilityEventSource {
public void invalidate() {
invalidate(true);
}
@UnsupportedAppUsage
public void invalidate(boolean invalidateCache) {
invalidateInternal(0, 0, mRight - mLeft, mBottom - mTop, invalidateCache, true);
}
void invalidateInternal(int l, int t, int r, int b, boolean invalidateCache, boolean fullInvalidate) {
// ...
if ((mPrivateFlags & (PFLAG_DRAWN | PFLAG_HAS_BOUNDS)) == (PFLAG_DRAWN | PFLAG_HAS_BOUNDS) || (invalidateCache && (mPrivateFlags & PFLAG_DRAWING_CACHE_VALID) == PFLAG_DRAWING_CACHE_VALID) || (mPrivateFlags & PFLAG_INVALIDATED) != PFLAG_INVALIDATED || (fullInvalidate && isOpaque() != mLastIsOpaque)) {
final ViewParent p = mParent;
if (p != null && ai != null && l < r && t < b) {
final Rect damage = ai.mTmpInvalRect;
damage.set(l, t, r, b);
p.invalidateChild(this, damage);
}
// ...
}
}
}
从源码中可以看出,无论是调用ViewRootImpl#requestLayout
还是View#invalidate
来对UI
进行更新,最终都会调用ViewRootImpl#checkThread
检查当前线程与创建ViewRootImpl
实例时所在的线程是否一致,不一致将会抛出异常。而ViewRootImpl
实例的创建是在SystemServer
进程的AMS
通过Binder
调用到App
进程后,经过ActivityThread#mH
切换到主线程执行的,因此最终检查的时候如果更新UI
的线程不是主线程将会抛出异常。
1. 目的用途
根据上面的介绍,Handler
机制的主要作用是为Android应用开发提供线程切换的能力,实现在指定线程执行特定的任务。此外,Handler
机制还支持执行延时任务。
上面提到的线程切换其实也可以理解为线程间通信,而线程间通信一般分为显式通信和隐式通信,显式通信一般是通过发消息的方式来实现,隐式通信则是通过共享内存(静态变量的读写)来实现。而Android中的Handler
机制可以理解为前者,即显式地在线程间通过发送消息进行通信。
2. 使用方式
对实现原理的分析和理解是为了更好地使用Handler
机制来解决开发中遇到的问题,所以下面先对Handler
的使用方式进行说明,然后再分析其原理实现。Handler
的使用包括三个步骤:创建Handler
实例、使用Handler
发送消息以及使用Handler
处理消息。
2.1. 创建Handler实例
一般来说,日常开发中主要通过Handler
对象发送消息以及处理消息,所以先简单看下Handler
的构造函数。
public class Handler {
@UnsupportedAppUsage
final Looper mLooper;
final MessageQueue mQueue;
@UnsupportedAppUsage
final Callback mCallback;
final boolean mAsynchronous;
// 使用给定的Looper对象创建Handler对象
public Handler(@NonNull Looper looper) {
this(looper, null, false);
}
// 使用给定的Looper对象创建Handler对象,并传入callback参数用于处理消息。
public Handler(@NonNull Looper looper, @Nullable Callback callback) {
this(looper, callback, false);
}
@UnsupportedAppUsage
public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
}
从构造函数可以看出,创建Handler
对象时必须传递一个Looper
对象,然后Handler
对象持有Looper
对象以及MessageQueue
对象(Looper
对象直接持有)。因此有必要分析下Looper
对象的创建及其实现。
public final class Looper {
@UnsupportedAppUsage
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
@UnsupportedAppUsage
final MessageQueue mQueue;
final Thread mThread;
// 构造一个Looper对象,根据quitAllowed决定是否可以退出消息循环
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
private static void prepare(boolean quitAllowed) {
// 如果之前已经为当前线程创建过Looper对象,则会抛出异常,因此一个线程中只能创建一次Looper对象
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
// 将当前线程初始化为looper,即只能处理消息循环的线程
public static void prepare() {
prepare(true);
}
}
从Looper
对象的构造函数可以看出,一般情况下(反射方式除外)是通过静态方法Looper#prepare
来构造Looper
对象,而调用了Looper#prepare
方法的线程就具备了处理消息的能力,即之后可以调用Looper#loop
方法来开启循环来不断地处理消息。
Looper
构造函数中创建了MessageQueue
对象,并将当前所在线程记录到mThread
变量中,如果当前线程之前已经创建过Looper
对象就会抛出异常,见Looper#prepare
方法。
从上面的源码可以看出,一个Looper
对象中持有一个MessageQueue
对象,所以一个线程对应唯一一个Looper
对象以及唯一一个MessageQueue
对象。
因为Looper
对象中持有了MessageQueue
对象,从命名上可以看出MessageQueue
是一个消息队列,一般消息队列适用于存储消息,所以这里看下MessageQueue
的构造函数。
public final class MessageQueue {
@UnsupportedAppUsage
Message mMessages;
// True if the message queue can be quit.
@UnsupportedAppUsage
private final boolean mQuitAllowed;
@UnsupportedAppUsage
@SuppressWarnings("unused")
private long mPtr; // used by native code
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
private native static long nativeInit();
// ...
}
MessageQueue
的构造函数主要调用了nativeInit
这个native
方法,这个nativeInit
方法和Looper
线程(调用了Looper#prepare
方法的线程)的阻塞和唤醒有关,等到后面再对其进行进一步分析。
MessageQueue
类中还定义了一个Message
类型的成员变量mMessages
,Message
是用于承载信息的媒介,开发者可以通过Handler
对象发送Message
实例到Looper
线程。关于消息的生产后面会单独分析。
2.2. 使用Handler发送消息
在创建完Handler
对象之后就可以发送消息到消息队列MessageQueue
了,开发中一般使用post
方法将一个Runnable
对象封装为一个Message
对象后插入到队列中,或者使用sendMessageAtTime
方法将一个Message
对象插入到队列中,这两种方式的区别就在于消息的处理者,前者是由Runnable
对象完成消息的处理,后者则可能会经过多个不同处理者进行处理。
public class Handler {
public final boolean post(@NonNull Runnable r) {
return sendMessageDelayed(getPostMessage(r), 0);
}
public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
public final boolean postAtTime(@NonNull Runnable r, long uptimeMillis) {
return sendMessageAtTime(getPostMessage(r), uptimeMillis);
}
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
// ...
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg, long uptimeMillis) {
msg.target = this; // 将当前Handler实例赋值给消息的target成员变量,用于后续的消息分发处理
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
}
可以看到无论使用哪个方法发送消息,最终都是调用MessageQueue#enqueueMessage
方法将消息插入到MessageQueue
中。
2.3. 使用Handler处理消息
在Looper
线程开始循环获取之后,获取到的消息将会被分发处理,这里主要是通过Handler#dispatchMessage
方法实现。而Handler
实例是从哪里赋值的呢?前面有提到过,不管通过哪个方法发送消息,最终会走到Handler#enqueueMessage
方法,这个方法里会将当前的Handler
实例赋值到Message#target
变量中。
public final class Looper {
private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) {
// ...
try {
// 分发消息
msg.target.dispatchMessage(msg);
// ...
} catch (Exception exception) {
// ...
throw exception;
} finally {
// ...
}
// ...
// 回收消息,放回消息池
msg.recycleUnchecked();
return true;
}
}
public class Handler {
final Callback mCallback;
// ...
public Handler(@NonNull Looper looper, @Nullable Callback callback) {
this(looper, callback, false);
}
public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
public void dispatchMessage(@NonNull Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
}
如果通过post
方法发送的消息,直接由方法的入参Runnable
对象进行处理。否则,先看Handler
对象在构造时有无传入Callback
对象,有的话先交由Callback
对象处理,根据处理返回值判断是否需要继续交给Handler#handleMessage
方法进一步处理。如果没有Callback
对象,则直接交由Handler#handleMessage
方法处理。
可见,如果想要绕过Handler#handleMessage
方法,要么通过post
方法发送的消息,要么在构造Handler
对象时传入Callback
对象,通过Callback
对象拦截处理消息。
2.4. 小结
一般在开发中,首先创建Looper
对象使当前线程成为looper
线程,接着基于Looper
对象创建Handler
对象,用于向MessageQueue
对象中插入Message
对象,最终由Looper
对象进行消息的分发,并交由对应的Handler
对象进行处理。
3. 实现原理
从上面的内容可以看出,Handler
消息机制本质上就是生产消息和消费消息。消息的生产一般是通过Handler
向MessageQueue
中插入Message
来实现,消息的消费则是通过Looper
对MessageQueue
进行轮询获取Message
并对其进行分发处理。
下面从Handler
消息机制中消息的流转过程出发,结合源码对Handler
消息机制的内部实现进行分析,主要包含以下几个部分:
Handler
消息机制的主要组成部分Handler
消息机制中的消息发送Handler
消息机制中的消息分发处理Handler
消息机制中的线程阻塞与唤醒
3.1. 结构组成
上面对Handler
使用方式已经做了初步介绍,从Handler
使用的过程来看,整个Handler
消息机制主要由Looper
、MessageQueue
、Message
以及Handler
组成,各组成部分之间的关系如下图所示。
-
Looper
:循环获取消息并分发消息,线程单例(即每个线程最多只有一个Looper
实例)
Looper
对象持有MessageQueue
对象,通过死循环来实现对MessageQueue
对象存储的Message
对象进行获取和分发处理,主要方法包括Looper#prepare
、Looper#loop
: -
MessageQueue
:用于提供消息的存储和获取,线程单例
MessageQueue
对象持有队头消息Message
对象,并提供Message
对象的插入和获取,主要方法包括:MessageQueue#enqueueMessage
、MessageQueue#next
; -
Message
:信息的载体
Message
对象持有对其他Message
对象的引用,作为线程间通信的媒介,存储了跨线程传输的数据。
public final class Message implements Parcelable {
Message next;
// ...
}
-
Handler
:发送和处理消息(外部持有的句柄)
Handler
对象持有Looper
对象,用于发送消息Message
以及消息Message
的具体处理逻辑实现, -
ThreadLocal
:线程本地存储,用于在不同线程中维护各自独立的数据副本
通过ThreadLocal
实现了Looper
对象的线程单例,简化了不同线程间的Looper
对象的管理。
3.2. 消息的生产
- 创建消息
Message
类提供了公有的构造函数,不过从构造函数的注释来看,更推荐使用Message.obtain
方法获取/创建消息,这个方法待会再看,先看下Message
类的成员变量,可以看到有一个Message
类型的变量next
,如果了解过链表结构的实现的话,应该就会想到Message
类其实就是一个链表的节点类。
public final class Message implements Parcelable {
@UnsupportedAppUsage
/*package*/ Message next;
// ...
public static final Object sPoolSync = new Object();
private static Message sPool;
private static int sPoolSize = 0;
private static final int MAX_POOL_SIZE = 50;
/** Constructor (but the preferred way to get a Message is to call {@link #obtain() Message.obtain()}).
*/
public Message() {
}
//...
/**
* Return a new Message instance from the global pool. Allows us to
* avoid allocating new objects in many cases.
*/
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message();
}
/**
* Recycles a Message that may be in-use.
* Used internally by the MessageQueue and Looper when disposing of queued Messages.
*/
@UnsupportedAppUsage
void recycleUnchecked() {
// Mark the message as in use while it remains in the recycled object pool.
// Clear out all other details.
flags = FLAG_IN_USE;
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = UID_NONE;
workSourceUid = UID_NONE;
when = 0;
target = null;
callback = null;
data = null;
synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
next = sPool;
sPool = this;
sPoolSize++;
}
}
}
}
接下来再看下Message.obtain
方法,Message
类持有了一个全局的对象池,当Message
对象使用之后将其放入对象池中进行复用,避免频繁创建Message
对象,这也体现了池化思想。
- 发送消息
一般发送消息使用的方法有两种,一种是传递一个Runnable
对象(比如postXXX
方法),一种是传递一个Message
对象(比如sendMessageXXX
方法),但是跟着代码看下去最终都是调用了Handler#enqueueMessage
方法,内部最终调用了MessageQueue#enqueueMessage
方法。
public class Handler {
public final boolean post(@NonNull Runnable r) {
return sendMessageDelayed(getPostMessage(r), 0);
}
public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
public final boolean postAtTime(@NonNull Runnable r, long uptimeMillis) {
return sendMessageAtTime(getPostMessage(r), uptimeMillis);
}
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
}
从源码可以看出,无论是调用哪个方法发送消息,最终都会调用MessageQueue#enqueueMessage
方法并传递一个Message
对象和一个时间戳when
到MessageQueue
中,根据时间戳when
判断应该插入到消息队列的哪个位置。
从插入消息的方法来看,通过sendEmptyMessageAtTime
或者postAtTime
发送的消息,其时间戳when
的取值没有限制,可小于当前系统的时间戳;通过sendMessageAtFrontOfQueue
发送的消息,会被插入到消息队列的头部;其他的方法则可以指定一个相对于当前时间戳的延时,当前时间戳加上延时就是消息最终的时间戳when
。
- 消息入队
下面看下MessageQueue#enqueueMessage
方法的实现,首先会对消息msg
的成员变量进行检查,要求必须消息msg
的target
不能为null
(target
为null
则消息将无法被分发处理),并且消息msg
不能处于使用中的状态,最后检查Looper
所在的线程是否已经quit
,如果已经quit
过,那么没必要再插入消息了,因为插入了消息也不会被处理。
public final class MessageQueue {
private boolean mQuitting;
@UnsupportedAppUsage
Message mMessages;
// Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
private boolean mBlocked;
@UnsupportedAppUsage
@SuppressWarnings("unused")
private long mPtr; // used by native code
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
synchronized (this) {
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
if (mQuitting) {
IllegalStateException e = new IllegalStateException(msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
// 1. 消息队列为空,即头结点为null,则直接作为头结点插入
// 2. 待插入消息的时间戳为0,则直接作为头结点插入
// 3. 待插入消息的时间戳小于头节点的时间戳,则作为头结点插入
if (p == null || when == 0 || when < p.when) {
// 待插入消息作为头结点插入时,如果当前Looper线程处于阻塞状态,那么就将其唤醒.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else { // 在队列的中间部分插入消息。
// 如果队列的头部消息是一个同步屏障并且待插入的消息msg是队列中最靠前的消息(时间戳最小)
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
// 如果在遍历链表的过程中发现了异步消息,则重置needWake,因为待插入的消息一定不是第一个异步消息
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// 因为上面检查了mQuitting不为true,所以mPtr应该也不是0。
if (needWake) {
// 唤醒当前被阻塞的线程
nativeWake(mPtr);
}
}
return true;
}
}
从源码可以看出,消息的入队分两种情况:
- 消息插入队头
如果队列中没有元素或者待插入消息的时间戳为0或者小于队首消息的时间戳,直接将消息作为头结点插入队头。此外如果Looper
线程处于阻塞状态还需要唤醒线程,让线程进行处理消息。
补充说明:这里插入的消息一定不是同步屏障,但可能是异步消息,因此如果消息不是同步屏障,则插入队头时都会唤醒被阻塞的Looper
线程。
- 消息插入队中或队尾
根据待插入的消息的时间戳遍历链表,寻找插入位置并插入消息。这种情况下需要判断队头的消息是否为同步屏障,如果队头消息是同步屏障,则根据待插入的消息是否为队列中第一个异步消息来决定是否唤醒被阻塞的Looper
线程。
下面看下插入同步屏障的方法,可以看到同步屏障的时间戳就是插入消息队列时的系统时间戳。因此同步屏障并不是默认插入队头的,所以在同步屏障插入之后,有可能因为处理更靠前的同步消息而一直无法位于同步屏障之后的异步消息。同时,同步屏障只能保证位于其后面的异步消息优先于同步消息的处理。
注意,插入同步屏障不会唤醒被阻塞的Looper
线程。
public final class MessageQueue {
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}
}
3.3. 消息的消费
当消息被插入消息队列之后,Looper
就可以对消息队列中的消息进行分发处理,其主要的代码实现在Looper#loop
中。
public final class Looper {
/**
* 返回当前线程持有的Looper实例,如果当前线程还没有关联Looper的话则返回null。
*/
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
/**
* 在当前线程对消息队列进行循环处理,线程退出之前必须调用quit方法结束循环.
*/
@SuppressWarnings("AndroidFrameworkBinderIdentity")
public static void loop() {
// 获取当前线程的Looper
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
// 重复调用loop方法时输出日志
if (me.mInLoop) {
Slog.w(TAG, "Loop again would have the queued messages be executed before this one completed.");
}
me.mInLoop = true;
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
// Allow overriding a threshold with a system prop. e.g.
// adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
final int thresholdOverride = SystemProperties.getInt("log.looper." + Process.myUid() + "." + Thread.currentThread().getName() + ".slow", 0);
me.mSlowDeliveryDetected = false;
// 死循环调用loopOnce,直到loopOnce返回false时才会退出循环,返回false说明Looper线程已经退出。
for (;;) {
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}
/**
* 获取并分发一个message。
*/
@SuppressWarnings("AndroidFrameworkBinderIdentity")
private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) {
Message msg = me.mQueue.next(); // 可能会阻塞当前线程
if (msg == null) {
// 没有获取到消息说明消息队列正在退出。
return false;
}
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what);
}
// ...
try {
// 分发消息
msg.target.dispatchMessage(msg);
// ...
} catch (Exception exception) {
// ...
throw exception;
} finally {
// ...
}
// ...
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// ...
// 回收消息,放回消息池
msg.recycleUnchecked();
return true;
}
}
3.3.1. 消息的获取
Looper#loop
方法中循环调用Looper#loopOnce
方法获取一个消息,Looper#loopOnce
方法中调用了MessageQueue#next
方法从消息队列中获取消息,当前线程可能会阻塞在MessageQueue#next
方法的调用处。
当MessageQueue#next
方法返回了一个消息之后,就会将消息进行分发处理,并在处理完之后将消息进行回收。
public final class MessageQueue {
@UnsupportedAppUsage
@SuppressWarnings("unused")
private long mPtr; // used by native code
@UnsupportedAppUsage
Message mMessages;
// Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
private boolean mBlocked;
private boolean mQuitting;
@UnsupportedAppUsage
private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
private IdleHandler[] mPendingIdleHandlers;
@UnsupportedAppUsage
Message next() {
// 如果消息循环已经退出,这里将会直接返回null。
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// 通过JNI调用到native层,当前线程可能会被阻塞
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// 尝试获取下一个消息
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
// 1. 队头消息是同步屏障的情况
if (msg != null && msg.target == null) {
// 寻找第一个异步消息
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
// msg不为null的时候,要么msg是异步消息,要么是队头消息
if (msg != null) {
if (now < msg.when) {
// 当前消息的时间戳大于当前系统的时间戳,设置超时后进入阻塞,等到超时之后重新获取消息
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 当前消息已经可以处理(当前时间已经超过消息指定的时间戳),从链表中移除当前消息并返回
mBlocked = false; // 返回前设置为非阻塞状态,避免重复唤醒
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
msg.markInUse();
return msg;
}
} else {
// 要么链表中没有消息,要么队头消息是同步屏障且链表中没有异步消息,此时进入阻塞状态(无超时)
nextPollTimeoutMillis = -1;
}
// 已经退出Looper,调用dispose方法进行销毁native层资源。
if (mQuitting) {
dispose();
return null;
}
// 如果是第一次没有获取到消息(即当前线程处于空闲状态),那么获取idlers并执行。
// IdleHandler只会在队列为空或者队列中第一个消息的处理时机还没到的时候才会执行。
if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
// 如果没有可执行的IdleHandler则进入阻塞状态
if (pendingIdleHandlerCount <= 0) {
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// 遍历执行IdleHandler,我们只会在第一次循环时进入这里。
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// 将pendingIdleHandlerCount重置为0,这样下一次循环就不会再次执行这些IdleHandler了。
pendingIdleHandlerCount = 0;
// 如果执行了IdleHandler,那么可能会有一个消息可以进行处理(新消息或者消息到了可以处理的时间),下次循环直接再次获取消息,而不是进入阻塞状态。
nextPollTimeoutMillis = 0;
}
}
@UnsupportedAppUsage
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
// ...
}
MessageQueue#next
方法很重要,下面总结下关键的逻辑和步骤。
首先,第一次循环时尝试从队列中获取消息,如果获取到了超时消息,直接取消阻塞状态并返回超时消息用于后续的分发处理;如果没有获取到超时消息,这里有两种情况,一种是消息未超时,还有一种是没有消息:
- 消息未超时:设置超时时间为处理消息还需要等待的时间;
- 没有消息:设置超时时间为-1;
不管是哪种情况,都会去判断是否执行mIdleHandlers
里的IdleHandler
,但是只有在没有消息的时候才会执行,否则,除非同步屏障消息的时间戳大于当前时间戳(一般不会,因为MessageQueue#nextpostSyncBarrier
插入的时间戳都是方法调用时的时间戳)。如果可以执行并执行了mIdleHandlers
里的IdleHandler
之后,将pendingIdleHandlerCount
置为0,这样下一次循环时将不会再次处理mIdleHandlers
里的IdleHandler
。
此外,第一次循环之后nextPollTimeoutMillis
被置为0,因此下次循环不会进行阻塞状态,而是直接尝试获取消息。
接着,后续的循环和第一次循环的处理逻辑基本一致,唯一的区别就在于:
- 不处理
mIdleHandlers
里的IdleHandler
; - 可能会进入超时阻塞状态;
当MessageQueue#next
返回一个消息之后,再次进入MessageQueue#next
会重复上面的过程,因此mIdleHandlers
里的IdleHandler
在每次调用MessageQueue#next
方法之后都可能会被处理(一次MessageQueue#next
方法调用的过程中最多处理一次),相当于只会在两个消息之间处理一次mIdleHandlers
里的IdleHandler
。
3.3.2. 消息的分发处理
Looper
在循环获取消息之后,获取到超时消息后会分发处理消息,主要是通过Message#target
的dispatchMessage
方法进行分发处理,Message#target
是Handler
对象,对应了发送消息时的Handler
对象。
public final class Looper {
/**
* 获取并分发一个message。
*/
@SuppressWarnings("AndroidFrameworkBinderIdentity")
private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) {
// ...
try {
// 分发消息
msg.target.dispatchMessage(msg);
// ...
} catch (Exception exception) {
// ...
throw exception;
} finally {
// ...
}
// ...
// 回收消息,放回消息池
msg.recycleUnchecked();
return true;
}
}
public final class Message implements Parcelable {
@UnsupportedAppUsage
/*package*/ Handler target;
@UnsupportedAppUsage
/*package*/ Runnable callback;
// ...
}
public class Handler {
@UnsupportedAppUsage
final Callback mCallback;
// ...
/**
* Use the provided {@link Looper} instead of the default one and take a callback
* interface in which to handle messages.
*
* @param looper The looper, must not be null.
* @param callback The callback interface in which to handle messages, or null.
*/
public Handler(@NonNull Looper looper, @Nullable Callback callback) {
this(looper, callback, false);
}
@UnsupportedAppUsage
public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
public void dispatchMessage(@NonNull Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
}
从dispatchMessage
方法可以看出,消息首先交给Message#callback
,一般是通过postXXX
方法发送的消息的Message#callback
是有值的,值就是postXXX
方法的类型为Runnable
的参数;如果Message#callback
为null的话,则尝试将消息交给Handler#mCallback
进行处理,Handler#mCallback
是在构造Handler
对象时传入的;如果Handler#mCallback
为null的话,直接交由Handler#handleMessage
进行处理,否则先交由Handler#mCallback
处理,如果Handler#mCallback
返回false的话,再交由Handler#handleMessage
进行处理,否则直接返回。从这个实现来看,优先使用消息的callback
成员变量处理消息,其次是Handler#mCallback
,最后才是Handler#handleMessage
,可见组合优先于继承。
3.3.3. 小结
整体流程如下图所示:
3.4. 线程阻塞与唤醒
根据上文的分析可知,Handler
机制的关键就在于通过Looper
不断获取MessageQueue
中存放的Message
,这个是通过死循环来实现的。但是如果一个线程进入死循环会导致CPU
占用持续飙升,所以下面就来看下Handler
机制是如何在实现不断获取Message
的同时保证CPU
资源不会被一直占用。
3.4.1. 线程阻塞
首先看下获取消息的源码,可以看到Looper#loop
方法会不断通过MessageQueue#next
方法从消息队列中获取消息,从注释也可以看出,MessageQueue#next
方法的调用可能会导致当前线程被阻塞。
public final class Looper {
public static void loop() {
final Looper me = myLooper();
// ...
// 死循环调用loopOnce获取一个消息,如果返回false则说明需要退出死循环,即当前线程结束并退出。
for (;;) {
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}
private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) {
Message msg = me.mQueue.next(); // next方法可能会导致当前线程被阻塞
if (msg == null) { // 获取不到消息说明消息队列已经不再使用
return false;
}
try {
// 分发处理消息
msg.target.dispatchMessage(msg);
// ...
} catch (Exception exception) {
// ...
throw exception;
} finally {
// ...
}
msg.recycleUnchecked();
return true;
}
}
从MessageQueue#next
方法可以看出,在没有获取到可分发处理的消息时,将会调用nativePollOnce
这个native方法使当前线程进入阻塞状态,而何时退出阻塞状态,则取决于nextPollTimeoutMillis
的取值:
- 大于0,则等待主动唤醒或者超过
nextPollTimeoutMillis
毫秒之后自动退出阻塞状态; - 小于0,则只能等到主动唤醒,否则将会一直阻塞;
- 等于0,不会进入阻塞状态;
public final class MessageQueue {
@UnsupportedAppUsage
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
Message next() {
// 如果消息队列已经退出那么mPtr会被置为0.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
// ...
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// 如果nextPollTimeoutMillis大于0或者小于0,当前线程将会进入阻塞状态
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// ...
if (msg != null) {
if (now < msg.when) {
// 消息还没到处理的时间,设置阻塞等待的超时时间后下一次循环进入超时阻塞.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 获取到可以分发处理的消息,将阻塞状态置为false,下面将会继续分发处理消息
mBlocked = false;
return msg;
}
} else {
// 要么消息队列中没有消息,要么队首消息是同步屏障且消息队列中没有异步消息,下一次循环进入阻塞,直到有消息插入后再唤醒。
nextPollTimeoutMillis = -1;
}
// ...
}
// ...
// 走到这里说明在消息队列没有消息或者没有异步消息(队首消息是同步屏障)的时候处理了IdleHandler,因此需要将nextPollTimeoutMillis改为0,这样下一次循环将会再次尝试获取消息,之所以这么做是因为在处理IdleHandler的过程中,可能有消息插入到消息队列中。
nextPollTimeoutMillis = 0;
}
}
// ...
}
总结一下,发生线程阻塞的情况:
- 情况一:消息队列中没有消息时,或者队首消息是同步屏障但是消息队列中没有找到异步消息,下一次循环进入阻塞状态,直到有消息插入后才可能被唤醒;
- 情况二:消息队列中找到的消息还没有到达指定的执行时间,下一次循环进入超时阻塞状态,直到有消息插入或者超时才可能被唤醒;
3.4.2. 线程唤醒
下面再看下线程是什么时候结束阻塞状态,即何时被唤醒。从上面的分析可知,一般是在获取不到消息的时候线程会进入阻塞状态(获取到消息但是消息没有到达执行时间这种情况是超时阻塞,因此即使没有插入消息,只要超时也可以退出阻塞状态),那如果有消息可以获取,是不是就应该将线程唤醒,这样就可以获取到消息继续分发处理了。
public final class MessageQueue {
// 唤醒处于阻塞状态的Looper线程
private native static void nativeWake(long ptr);
boolean enqueueMessage(Message msg, long when) {
// ...
synchronized (this) {
// ...
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
// 1. 队列为空 2. 插入的消息的执行时间戳when为0 3. 插入的消息的执行时间戳小于队首消息的执行时间戳
if (p == null || when == 0 || when < p.when) {
// 消息msg作为队首消息进入消息队列,此时如果Looper线程处于阻塞状态,就需要唤醒线程。
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// 消息msg插入到队列中或者队列尾部,除非队首消息是同步屏障且插入的消息是最早的异步消息,否则不需要唤醒Looper线程。
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
// 如果插入的消息不是第一个异步消息,则不需要唤醒Looper线程
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p;
prev.next = msg;
}
// 因为需要唤醒Looper线程,所以调用nativeWake唤醒线程
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
public void removeSyncBarrier(int token) {
synchronized (this) {
Message prev = null;
Message p = mMessages;
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
final boolean needWake;
if (prev != null) {
// 待移除的同步屏障不是队首消息,这种情况下不需要唤醒Looper线程
prev.next = p.next;
needWake = false;
} else {
// 如果待移除的同步屏障是队首消息,并且移除之后,新的队首元素不是同步屏障,则唤醒Looper线程
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
p.recycleUnchecked();
// 如果正在退出loop,那么已经被唤醒过了,这里就不在进行唤醒了。
// 如果没有退出loop,那么调用nativeWake进行唤醒。
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}
}
从nativeWake
方法的调用点可以发现,当插入消息、移除同步屏障以及退出消息队列这三种情况发生时,可能会调用nativeWake
方法唤醒处于阻塞状态的Looper
线程。
- 插入消息:如果待插入的消息(可能是同步消息也可能是异步消息)是作为队首消息插入消息队列的,那么当线程处于阻塞状态时直接唤醒线程;如果队首消息是同步屏障且待插入的消息是队列中第一个异步消息,那么当线程处于阻塞状态时直接唤醒线程。
- 移除同步屏障:如果待移除的同步屏障是队首消息,并且移除同步屏障之后新的队首消息(可能为null)不是同步屏障,那么当线程处于阻塞状态时直接唤醒线程。
- 退出消息队列:当调用
quit
方法之后退出消息队列,会直接唤醒线程。
3.4.3. native层的实现
在从Java层分析的过程中,看到nativePollOnce
方法的入参有两个:创建MessageQueue
实例时native层返回的mPtr
以及阻塞超时timeoutMillis
。先看下nativeInit
方法在native层的实现。
public final class MessageQueue {
private long mPtr;
private native static long nativeInit();
private native void nativePollOnce(long ptr, int timeoutMillis);
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
}
3.4.3.1. nativeInit
// frameworks/base/core/jni/android_os_MessageQueue.cpp
class NativeMessageQueue : public MessageQueue, public LooperCallback {
public:
// ...
void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);
void wake();
void setFileDescriptorEvents(int fd, int events);
virtual int handleEvent(int fd, int events, void* data);
// ...
private:
JNIEnv* mPollEnv;
jobject mPollObj;
jthrowable mExceptionObj;
};
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
可以看到在native层创建了NativeMessageQueue
实例,可以看到NativeMessageQueue
实例本身没有做什么事情,但是NativeMessageQueue
继承了MessageQueue
,同时为当前线程创建了Looper
实例(native层)并赋值给MessageQueue
的成员变量mLooper
,并且native层的Looper
实例也是线程单例的(通过thread_local
实现)。
// system/core/libutils/Looper.cpp
thread_local static sp<Looper> gThreadLocalLooper;
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// 构造用于唤醒的Fd
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
// 重建Epoll事件
rebuildEpollLocked();
}
void Looper::rebuildEpollLocked() {
// 关闭已有的epoll实例
if (mEpollFd >= 0) {
mEpollFd.reset();
}
// 调用epoll_create1分配新的epoll实例并注册WakeEventFd用于唤醒。
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
// 构建唤醒事件,在处理管道中的事件时会用到
epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
// 调用epoll_ctl添加WakeEventFd到epoll实例用于监听唤醒事件
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);
// 处理其他请求监听的事件,并将这些请求监听的fd添加到新的epoll实例用于监听
for (const auto& [seq, request] : mRequests) {
epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
}
}
void Looper::setForThread(const sp<Looper>& looper) {
gThreadLocalLooper = looper;
}
sp<Looper> Looper::getForThread() {
return gThreadLocalLooper;
}
看下来nativeInit
方法主要是在native层创建了对应的NativeMessageQueue
实例和Looper
实例,并将NativeMessageQueue
实例返回给了Java层的MessageQueue
实例来持有。Looper
实例的创建需要分析下,可以看到Looper
的构造函数中,主要做了以下工作:
- 分配新的
epoll
实例并注册WakeEventFd
到新的epoll
实例用于监听唤醒事件; - 循环处理之前的请求,将对应的
fd
注册到新的epoll
实例用于监听响应的事件;
3.4.3.2. nativePollOnce
下面看下nativePollOnce
方法在native层的实现。
// frameworks/base/core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
// 调用Looper->pollOnce进行阻塞
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
通过Java层持有的native层的mPtr
转换为native层的MessageQueue
实例,并调用其pollOnce
方法,最终调用MessageQueue
实例持有的Looper
实例的pollOnce
方法。
// system/core/libutils/Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// mResponseIndex初始值为0,mResponses初始值是空集合
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}
// result初始值是0
if (result != 0) {
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
// 调用pollInner
result = pollInner(timeoutMillis);
}
}
从Looper::pollOnce
方法可以看出,在调用Looper::pollInner
方法之前,mResponses
中是没有元素的,因此直接看Looper::pollInner
方法的实现。
// system/core/libutils/Looper.cpp
int Looper::pollInner(int timeoutMillis) {
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// 设置为true,说明即将进入空闲状态.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 1. epoll_wait监听epoll实例,等待管道写入事件来唤醒,并设置相应的超时
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// 已监听到事件
mPolling = false;
// Acquire lock.
mLock.lock();
// ...
// 如果eventCount为0,说明不是被事件唤醒,而是超时唤醒
if (eventCount == 0) {
// POLL_TIMEOUT为-3
result = POLL_TIMEOUT;
goto Done;
}
// 循环处理接收到的事件
for (int i = 0; i < eventCount; i++) {
const SequenceNumber seq = eventItems[i].data.u64;
uint32_t epollEvents = eventItems[i].events;
if (seq == WAKE_EVENT_FD_SEQ) { // 唤醒事件
if (epollEvents & EPOLLIN) {
awoken(); // 唤醒线程
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else { // 其他类型的事件
const auto& request_it = mRequests.find(seq);
if (request_it != mRequests.end()) {
const auto& request = request_it->second;
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 添加到mResponses中,下面会对mResponses
mResponses.push({.seq = seq, .events = events, .request = request});
} else {
ALOGW("Ignoring unexpected epoll events 0x%x for sequence number %" PRIu64
" that is no longer registered.", epollEvents, seq);
}
}
}
Done: ;
// 处理消息队列总的消息。
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// 移除消息并分发处理消息
{ // 获取handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
handler->handleMessage(message);
} // 释放handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// 没有到达处理时间的消息对应的时间戳是下一次唤醒的时间戳。
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// 调用所有response的callback。
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
AutoMutex _l(mLock);
removeSequenceNumberLocked(response.seq);
}
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}
Looper::pollInner
方法首先调用epoll_wait
方法对epoll
实例进行监听,当监听到事件或者超时之后,继续向下执行。如果监听到事件,那么会对事件进行处理,如果事件中包含唤醒事件(WAKE_EVENT_FD_SEQ
且EPOLLIN
)则调用awoken
方法,否则调用事件的callback
。
可以看出,Java层调用nativePollOnce
方法之后,是否阻塞取决于epoll_wait
方法,而epoll_wait
方法主要是监听epoll
实例,等到事件的到来。如果监听到唤醒事件,awoken
方法就会被调用,看下这个方法做了什么。
// system/core/libutils/Looper.cpp
void Looper::awoken() {
uint64_t counter;
// 读取管道里的数据,如果失败就会重试
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}
// prebuilts/gcc/linux-x86/host/x86_64-linux-glibc2.17-4.8/sysroot/usr/include/x86_64-linux-gnu/bits/unistd.h
read (int __fd, void *__buf, size_t __nbytes)
{
if (__bos0 (__buf) != (size_t) -1)
{
if (!__builtin_constant_p (__nbytes))
return __read_chk (__fd, __buf, __nbytes, __bos0 (__buf));
if (__nbytes > __bos0 (__buf))
return __read_chk_warn (__fd, __buf, __nbytes, __bos0 (__buf));
}
return __read_alias (__fd, __buf, __nbytes);
}
可以看到awoken
方法调用到底层的read
方法,尝试读唤醒事件对应的Fd
,如果之前有向唤醒事件写入数据,则立即读出,否则阻塞等待。
3.4.3.3. nativeWake
Java层唤醒Looper
线程是通过nativeWake
方法调用到native层来实现的,因此看下nativeWake
方法的代码。
// frameworks/base/core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
nativeWake
方法通过传入的ptr
找到之前创建的NativeMessageQueue
实例,调用其NativeMessageQueue::wake
方法,最终调用到了Looper::wake
方法。
// system/core/libutils/Looper.cpp
void Looper::wake() {
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
// prebuilts/gcc/linux-x86/host/x86_64-linux-glibc2.17-4.8/sysroot/usr/include/unistd.h
/* 将buf中的N字节写入FD,返回写入的字节数,如果写入失败则返回-1. */
extern ssize_t write (int __fd, const void *__buf, size_t __n) __wur;
看到Looper::wake
方法的实现之后,发现也是调用了write
方法向唤醒事件的Fd
写入1,最终唤醒了监听唤醒事件的线程,这里的write
方法对应nativePollOnce
方法调用到的read
方法。
3.4.4. 小结
通过从Java层到native层的源码分析,可以得出:
- Java层和native层都有一套消息机制,各自独立.业务开发中一般主要和Java层的消息机制打交道;
- Java层负责实现消息的发送以及消息的分发处理,但是不负责
Looper
线程的阻塞和唤醒的实现; - native层负责实现
Looper
线程的阻塞和唤醒的能力,具体是通过Linux
的epoll
机制来实现的; - native层的消息机制后续可以再单独分析,看下Android中哪些消息是在native层进行分发处理的。
4. 其他
4.1. IdleHandler
在分析循环获取消息的源码时,还发现了IdleHandler
,IdleHandler
的执行时机是在每一次获取消息之后,如果没有获取到消息,并且是第一次循环,则会尝试执行IdleHandler
。具体的执行逻辑见下面的代码以及注释。
public final class MessageQueue {
Message next() {
// ...
int pendingIdleHandlerCount = -1; // -1 only during first iteration
for (;;) {
// ...
synchronized (this) {
// ...
// 走到这里说明当前循环没有获取到可以处理的消息
// 如果当前是第一次循环,pendingIdleHandlerCount为-1,因此只要队首消息不是同步屏障,那么就会执行mIdleHandlers中的IdleHandler
if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// mIdleHandlers为空集合,下一次循环进入阻塞状态。
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// 只有第一次循环时才可能执行到这里,因为后面的循环将会因为pendingIdleHandlerCount为0而不去获取mIdleHandlers中的元素
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// ...
}
}
// ...
}
由此可见,IdleHandler
可以用于当Looper线程处于空闲时(当前没有消息可以分发处理时),在进入阻塞状态之前执行一些工作,通常用于启动优化,比如在启动阶段插入一些IdleHandler
用于在主线程的消息队列空闲时执行一些执行时机较早的任务。
但是使用IdleHandler
可能会出现一直得不到执行的问题,比如主线程的消息队列一直在处理消息,此时IdleHandler
将一直得不到执行,因此,IdleHandler
不适用于必须执行,且对执行时机有明确要求的任务场景。
4.2. 卡顿检测
在分析获取到消息之后,对消息进行分发处理的时候发现会在消息分发处理的前后打印日志,因此这里的日志打印可以用于统计每个消息的处理耗时,如果某个消息处理时间过长,那么就可以将其捕获并用于分析当前调用链路中可能存在的耗时问题。
public final class Looper {
@UnsupportedAppUsage
private Printer mLogging;
/**
* Control logging of messages as they are processed by this Looper. If
* enabled, a log message will be written to <var>printer</var>
* at the beginning and ending of each message dispatch, identifying the
* target Handler and message contents.
*
* @param printer A Printer object that will receive log messages, or
* null to disable message logging.
*/
public void setMessageLogging(@Nullable Printer printer) {
mLogging = printer;
}
public static void loop() {
final Looper me = myLooper();
// ...
// 死循环调用loopOnce获取一个消息,如果返回false则说明需要退出死循环,即当前线程结束并退出。
for (;;) {
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}
private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) {
Message msg = me.mQueue.next(); // next方法可能会导致当前线程被阻塞
if (msg == null) { // 获取不到消息说明消息队列已经不再使用
return false;
}
// This must be in a local variable, in case a UI event sets the logger
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " "
+ msg.callback + ": " + msg.what);
}
// ...
try {
msg.target.dispatchMessage(msg);
// ...
} catch (Exception exception) {
// ...
throw exception;
} finally {
// ...
}
// ...
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// ...
return true;
}
}
public interface Printer {
/**
* Write a line of text to the output. There is no need to terminate
* the given string with a newline.
*/
void println(String x);
}
其实BlockCanary
的实现原理就是基于上述的日志打印的,通过自定义Printer并注入给Looper
对象,这样就可以对消息处理耗时统计,如果发现执行时间过长,则将其作为卡顿进行捕获并分析。
但是,基于这种方案实现的BlockCanary
可能存在不准确的问题,比如一次卡顿分布在连续的两个消息上,这个时候单独的消息没有被认为是卡顿,但是实际上,确实给用户带来了卡顿的体验。
此外,BlockCanary
是基于开启一个Looper
线程用于倒计时,本身就涉及到线程的调度,无法保证超时消息一定会正常处理。
如何解决BlockCanary
存在的问题?
4.3. 内存泄漏
内存泄漏的本质是不再使用的对象被其他正在使用的对象持有,导致一直无法被GC
回收。如果开发中在使用Handler
的时候方式不当,可能会导致内存泄漏问题。
比如自定义Handler
类是一个内部类,其持有的外部类是Activity
这种有生命周期的类,如果持有Handler
实例的消息Message
对象一直没有被处理,可能导致Activity
实例一直无法释放;
再比如Handler
实例是一个匿名内部类的实例,其持有的外部类是Activity
这种有生命周期的类,如果持有Handler
实例的消息Message
对象一直没有被处理,同样可能导致Activity
实例一直无法释放;
5. 总结
本文首先介绍了Handler
消息机制出现的背景以及原因,描述了其主要的作用和目的,接着介绍了开发过程中Handler
的使用方式,然后基于使用方式分析了Handler
消息机制的实现原理,最后介绍了Handler
机制的一些扩展应用场景。