当前位置: 首页 > article >正文

ActivityManagerService 分发广播(6)

ActivityManagerService 分发广播

简述

上一节我们看了发送广播流程,主要是将广播信息封装至BroadcastRecord,然后通过BroadcastQueueImpl/BroadcastQueueModernImpl的enqueueBroadcastLocked来发送广播。
这一节我们来看一下AMS是怎么分发广播的流程,BroadcastQueueImpl/BroadcastQueueModernImpl是两套策略,只是分发的模式不同,我们主要关心流程,策略细节就不去关注了,我们这里就挑BroadcastQueueImpl来看一下分发的流程。

广播分发

前面提到过BroadcastQueueImpl模式下,会有四个队列来发送广播,流程上四个队列都是一样的,只是有不同的优先级。
我们从BroadcastQueueImpl.enqueueBroadcastLocked开始看。
在这里插入图片描述

1.1 BroadcastQueueImpl.enqueueBroadcastLocked
区分有序广播和无序广播,如果有序广播,可能还需要替换队列中之前的BroadcastRecord。
有序和无序广播会添加到BroadcastDispatcher不同的队列中。
最终都会通过调用scheduleBroadcastsLocked来触发广播的分发。

public void enqueueBroadcastLocked(BroadcastRecord r) {
    r.applySingletonPolicy(mService);
    // flag如果有FLAG_RECEIVER_REPLACE_PENDING,则说明新的广播需要替代之前的
    final boolean replacePending = (r.intent.getFlags()
            & Intent.FLAG_RECEIVER_REPLACE_PENDING) != 0;

    // 有序广播需要顺序分发
    boolean serialDispatch = r.ordered;
    if (!serialDispatch) {
        final int N = (r.receivers != null) ? r.receivers.size() : 0;
        for (int i = 0; i < N; i++) {
            if (r.receivers.get(i) instanceof ResolveInfo) {
                serialDispatch = true;
                break;
            }
        }
    }

    if (serialDispatch) {
        // BroadcastDispatcher里面有一个mOrderedBroadcasts,如果之前已经发送过一样的广播,需要replace mOrderedBroadcasts里面之前的。
        final BroadcastRecord oldRecord =
                replacePending ? replaceOrderedBroadcastLocked(r) : null;
        if (oldRecord != null) {
            // 如果有替换,调用performReceiveLocked
            if (oldRecord.resultTo != null) {
                try {
                    oldRecord.mIsReceiverAppRunning = true;
                    performReceiveLocked(oldRecord, oldRecord.resultToApp, oldRecord.resultTo,
                            oldRecord.intent,
                            Activity.RESULT_CANCELED, null, null,
                            false, false, oldRecord.shareIdentity, oldRecord.userId,
                            oldRecord.callingUid, r.callingUid, r.callerPackage,
                            SystemClock.uptimeMillis() - oldRecord.enqueueTime, 0, 0,
                            oldRecord.resultToApp != null
                                    ? oldRecord.resultToApp.mState.getCurProcState()
                                    : ActivityManager.PROCESS_STATE_UNKNOWN);
                } catch (RemoteException e) {
                    // ...
                }
            }
        } else {
            // 否则添加到BroadcastDispatcher的队列中去。
            enqueueOrderedBroadcastLocked(r);
            // 最终需要调用scheduleBroadcastsLocked来发送广播,详见1.2
            scheduleBroadcastsLocked();
        }
    } else {
        final boolean replaced = replacePending
                && (replaceParallelBroadcastLocked(r) != null);
        // 无序广播
        if (!replaced) {
            // 添加到BroadcastDispatcher的队列中去
            enqueueParallelBroadcastLocked(r);
            // 最终需要调用scheduleBroadcastsLocked来发送广播,详见1.2
            scheduleBroadcastsLocked();
        }
    }
}

1.2 BroadcastQueueImpl.scheduleBroadcastsLocked
发送一个BROADCAST_INTENT_MSG消息

public void scheduleBroadcastsLocked() {
    // ...log
    // 防重入
    if (mBroadcastsScheduled) {
        return;
    }
    // 发送BROADCAST_INTENT_MSG消息
    mHandler.sendMessage(mHandler.obtainMessage(BROADCAST_INTENT_MSG, this));
    mBroadcastsScheduled = true;
}

1.3 BroadcastHandler.handleMessage
调用processNextBroadcast

private final class BroadcastHandler extends Handler {

    @Override
    public void handleMessage(Message msg) {
        switch (msg.what) {
            case BROADCAST_INTENT_MSG: {
                if (DEBUG_BROADCAST) Slog.v(
                        TAG_BROADCAST, "Received BROADCAST_INTENT_MSG ["
                        + mQueueName + "]");
                // 调用 processNextBroadcast,详见1.4
                processNextBroadcast(true);
            } break;
            case BROADCAST_TIMEOUT_MSG: {
                synchronized (mService) {
                    // 这里是广播超时,触发ANR的地方,我们不是在说稳定性,这里就不关注了
                    broadcastTimeoutLocked(true);
                }
            } break;
        }
    }
}

1.4 BroadcastQueueImpl.processNextBroadcast
加了AMS大锁,然后调用processNextBroadcastLocked

private void processNextBroadcast(boolean fromMsg) {
    synchronized (mService) {
        processNextBroadcastLocked(fromMsg, false);
    }
}

1.5 BroadcastQueueImpl.processNextBroadcastLocked
首先先通过deliverToRegisteredReceiverLocked分发无序广播,然后再来处理有序广播。
有序广播需要一个一个receiver来分发,所以先要找到下一个receiver。这里有一个延迟策略,有一些广播app侧处理起来比较耗时,例如需要启动进程。
这种情况下就会将广播进行拆分,新建一个BroadcastRecord,它的receiver只有刚刚需要延迟的那一个,然后在原来的BroadcastRecord中删除这个receiver。
找到下一个receiver后,先设置广播的Timeout超时逻辑,这里相当于ANR埋雷,然后获取receiver的进程信息,检查是否需要skip该receiver,如果不需要则检查进程是否已经运行,如果已经运行就调用processCurBroadcastLocked来分发广播,否则会先启动目标进程,启动进程的逻辑和之前Activity启动进程逻辑类似,就不说了。

public void processNextBroadcastLocked(boolean fromMsg, boolean skipOomAdj) {
    BroadcastRecord r;

    // ...

    // 先分发无序广播
    while (mParallelBroadcasts.size() > 0) {
        r = mParallelBroadcasts.remove(0);
        r.dispatchTime = SystemClock.uptimeMillis();
        r.dispatchRealTime = SystemClock.elapsedRealtime();
        r.dispatchClockTime = System.currentTimeMillis();
        r.mIsReceiverAppRunning = true;

        // ...

        final int N = r.receivers.size();
        if (DEBUG_BROADCAST_LIGHT) Slog.v(TAG_BROADCAST, "Processing parallel broadcast ["
                + mQueueName + "] " + r);
        for (int i=0; i<N; i++) {
            Object target = r.receivers.get(i);
            // 分发无序广播
            deliverToRegisteredReceiverLocked(r,
                    (BroadcastFilter) target, false, i);
        }
        addBroadcastToHistoryLocked(r);
    }

    // 如果有mPendingBroadcast,正在发送的广播,什么都不做,只是检查一下对应进程是否还活着,如果活着就直接return。
    // 否则需要继续发送广播的流程
    if (mPendingBroadcast != null) {
        boolean isDead;
        if (mPendingBroadcast.curApp.getPid() > 0) {
            synchronized (mService.mPidsSelfLocked) {
                ProcessRecord proc = mService.mPidsSelfLocked.get(
                        mPendingBroadcast.curApp.getPid());
                isDead = proc == null || proc.mErrorState.isCrashing();
            }
        } else {
            final ProcessRecord proc = mService.mProcessList.getProcessNamesLOSP().get(
                    mPendingBroadcast.curApp.processName, mPendingBroadcast.curApp.uid);
            isDead = proc == null || !proc.isPendingStart();
        }
        if (!isDead) {
            // 进程还活着,什么都不做直接return
            return;
        } else {
            mPendingBroadcast.state = BroadcastRecord.IDLE;
            mPendingBroadcast.nextReceiver = mPendingBroadcastRecvIndex;
            mPendingBroadcast = null;
        }
    }

    boolean looped = false;
    // 循环处理广播,寻找下一个我们需要发送的广播以及receiver
    do {
        final long now = SystemClock.uptimeMillis();
        r = mDispatcher.getNextBroadcastLocked(now);

        if (r == null) {
            // ...
            // 没有广播需要分发,直接return
            return;
        }

        boolean forceReceive = false;

        // 
        int numReceivers = (r.receivers != null) ? r.receivers.size() : 0;
        if (mService.mProcessesReady && !r.timeoutExempt && r.dispatchTime > 0) {
            if ((numReceivers > 0) &&
                    (now > r.dispatchTime + (2 * mConstants.TIMEOUT * numReceivers))) {
                // ... 如果广播处理太慢超时,强制停止广播
                broadcastTimeoutLocked(false); // forcibly finish this broadcast
                forceReceive = true;
                r.state = BroadcastRecord.IDLE;
            }
        }

        if (r.state != BroadcastRecord.IDLE) {
            // ...如果当前广播不是IDLE状态,则直接返回
            return;
        }

        // 检查广播发送是否已经完成
        if (r.receivers == null || r.nextReceiver >= numReceivers
                || r.resultAbort || forceReceive) {
            if (r.resultTo != null) {
                boolean sendResult = true;
                // ...
                if (sendResult) {
                    // ...
                        // 如果需要发送结果
                        performReceiveLocked(r, r.resultToApp, r.resultTo,
                                new Intent(r.intent), r.resultCode,
                                r.resultData, r.resultExtras, false, false, r.shareIdentity,
                                r.userId, r.callingUid, r.callingUid, r.callerPackage,
                                r.dispatchTime - r.enqueueTime,
                                now - r.dispatchTime, 0,
                                r.resultToApp != null
                                        ? r.resultToApp.mState.getCurProcState()
                                        : ActivityManager.PROCESS_STATE_UNKNOWN);
                    // ...
                }
            }

            // ...清除广播超时计时器
            cancelBroadcastTimeoutLocked();

            // ... 
            mDispatcher.retireBroadcastLocked(r);
            r = null;
            looped = true;
            // 当前广播已经完成,则进入下一次循环
            continue;
        }

        // 检查下一个receiver是否在defer策略下。
        if (!r.deferred) {
            final int receiverUid = r.getReceiverUid(r.receivers.get(r.nextReceiver));
            if (mDispatcher.isDeferringLocked(receiverUid)) {
                // 如果这个是最后一个receiver,就没有必要做拆分操作了,就正常推迟即可。
                BroadcastRecord defer;
                if (r.nextReceiver + 1 == numReceivers) {
                    defer = r;
                    mDispatcher.retireBroadcastLocked(r);
                } else {
                    // 进行分割操作,这里会新构建一个BroadcastRecord,接收者只有这一个uid,单独来发送
                    defer = r.splitRecipientsLocked(receiverUid, r.nextReceiver);
                    // ...
                    // Track completion refcount as well if relevant
                    if (r.resultTo != null) {
                        int token = r.splitToken;
                        // 分割的计数。
                        if (token == 0) {
                            r.splitToken = defer.splitToken = nextSplitTokenLocked();
                            mSplitRefcounts.put(r.splitToken, 2);
                        } else {
                            final int curCount = mSplitRefcounts.get(token);
                            mSplitRefcounts.put(token, curCount + 1);
                        }
                    }
                }
                // 添加分割后需要延迟的广播
                mDispatcher.addDeferredBroadcast(receiverUid, defer);
                r = null;
                looped = true;
                continue;
            }
        }
    } while (r == null);

    int recIdx = r.nextReceiver++;

    r.receiverTime = SystemClock.uptimeMillis();
    r.scheduledTime[recIdx] = r.receiverTime;
    if (recIdx == 0) {
        r.dispatchTime = r.receiverTime;
        r.dispatchRealTime = SystemClock.elapsedRealtime();
        r.dispatchClockTime = System.currentTimeMillis();
        // ... trace/log
    }
    if (! mPendingBroadcastTimeoutMessage) {
        long timeoutTime = r.receiverTime + mConstants.TIMEOUT;
        // 设置广播超时
        setBroadcastTimeoutLocked(timeoutTime);
    }

    final BroadcastOptions brOptions = r.options;
    final Object nextReceiver = r.receivers.get(recIdx);

    //...
    // ...获取receiver进程信息

    // 判断是否需要跳过当前的receiver
    boolean skip = mSkipPolicy.shouldSkip(r, info);

    Bundle filteredExtras = null;
    if (!skip && r.filterExtrasForReceiver != null) {
        final Bundle extras = r.intent.getExtras();
        if (extras != null) {
            filteredExtras = r.filterExtrasForReceiver.apply(receiverUid, extras);
            if (filteredExtras == null) {
                skip = true;
            }
        }
    }
    // 如果需要跳过,就调用scheduleBroadcastsLocked进入下一次广播分发逻辑
    if (skip) {
        r.delivery[recIdx] = BroadcastRecord.DELIVERY_SKIPPED;
        r.curFilter = null;
        r.state = BroadcastRecord.IDLE;
        r.manifestSkipCount++;
        scheduleBroadcastsLocked();
        return;
    }
    r.manifestCount++;

    r.delivery[recIdx] = BroadcastRecord.DELIVERY_DELIVERED;
    r.state = BroadcastRecord.APP_RECEIVE;
    r.curComponent = component;
    r.curReceiver = info.activityInfo;
    r.curFilteredExtras = filteredExtras;

    final boolean isActivityCapable =
            (brOptions != null && brOptions.getTemporaryAppAllowlistDuration() > 0);
    maybeScheduleTempAllowlistLocked(receiverUid, r, brOptions);

    // ...

    // 检查receiver目标app是否已经启动
    if (app != null && app.getThread() != null && !app.isKilled()) {
        try {
            app.addPackage(info.activityInfo.packageName,
                    info.activityInfo.applicationInfo.longVersionCode, mService.mProcessStats);
            maybeAddBackgroundStartPrivileges(app, r);
            r.mIsReceiverAppRunning = true;
            // 发送广播,详见1.6
            processCurBroadcastLocked(r, app);
            return;
        }
        // ...
    }

    r.mWasReceiverAppStopped =
            (info.activityInfo.applicationInfo.flags & ApplicationInfo.FLAG_STOPPED) != 0;
    //  如果目标进程没有启动,这里需要先启动进程。
    r.curApp = mService.startProcessLocked(targetProcess,
            info.activityInfo.applicationInfo, true,
            r.intent.getFlags() | Intent.FLAG_FROM_BACKGROUND,
            new HostingRecord(HostingRecord.HOSTING_TYPE_BROADCAST, r.curComponent,
                    r.intent.getAction(), r.getHostingRecordTriggerType()),
            isActivityCapable ? ZYGOTE_POLICY_FLAG_LATENCY_SENSITIVE : ZYGOTE_POLICY_FLAG_EMPTY,
            (r.intent.getFlags() & Intent.FLAG_RECEIVER_BOOT_UPGRADE) != 0, false);
    r.curAppLastProcessState = ActivityManager.PROCESS_STATE_NONEXISTENT;
    if (r.curApp == null) {
        // 如果启动进程失败,结束当前的广播
        logBroadcastReceiverDiscardLocked(r);
        finishReceiverLocked(r, r.resultCode, r.resultData,
                r.resultExtras, r.resultAbort, false);
        // 重新调用scheduleBroadcastsLocked发送广播
        scheduleBroadcastsLocked();
        r.state = BroadcastRecord.IDLE;
        return;
    }

    maybeAddBackgroundStartPrivileges(r.curApp, r);
    mPendingBroadcast = r;
    mPendingBroadcastRecvIndex = recIdx;
}

1.6 BroadcastQueueImpl.processCurBroadcastLocked
通过ActivityThread.scheduleReceiver binder回调回app侧执行广播。

private final void processCurBroadcastLocked(BroadcastRecord r,
        ProcessRecord app) throws RemoteException {
    // ...
    // 跟新adj
    mService.enqueueOomAdjTargetLocked(app);
    mService.updateOomAdjPendingTargetsLocked(OOM_ADJ_REASON_START_RECEIVER);

    // ...

    boolean started = false;
    try {
        mService.notifyPackageUse(r.intent.getComponent().getPackageName(),
                                  PackageManager.NOTIFY_PACKAGE_USE_BROADCAST_RECEIVER);
        final boolean assumeDelivered = false;
        // 通过binder回调App侧,分发广播,详见1.7
        thread.scheduleReceiver(
                prepareReceiverIntent(r.intent, r.curFilteredExtras),
                r.curReceiver, null /* compatInfo (unused but need to keep method signature) */,
                r.resultCode, r.resultData, r.resultExtras, r.ordered, assumeDelivered,
                r.userId, r.shareIdentity ? r.callingUid : Process.INVALID_UID,
                app.mState.getReportedProcState(),
                r.shareIdentity ? r.callerPackage : null);
        started = true;
    } finally {
        // ...
    }

    if (app.isKilled()) {
        throw new RemoteException("app gets killed during broadcasting");
    }
}

1.7 ActivityThread.scheduleReceiver
将广播封装到ReceiverData,发送H.RECEIVER消息

public final void scheduleReceiver(Intent intent, ActivityInfo info,
        CompatibilityInfo compatInfo, int resultCode, String data, Bundle extras,
        boolean ordered, boolean assumeDelivered, int sendingUser, int processState,
        int sendingUid, String sendingPackage) {
    updateProcessState(processState, false);
    // 封装ReceiverData
    ReceiverData r = new ReceiverData(intent, resultCode, data, extras,
            ordered, false, assumeDelivered, mAppThread.asBinder(), sendingUser,
            sendingUid, sendingPackage);
    r.info = info;
    // 发送H.RECEIVER消息,详见1.8
    sendMessage(H.RECEIVER, r);
}

1.8 H.handleMessage
调用handleReceiver

public void handleMessage(Message msg) {
    // ...  
        case RECEIVER:
            if (Trace.isTagEnabled(Trace.TRACE_TAG_ACTIVITY_MANAGER)) {
                ReceiverData rec = (ReceiverData) msg.obj;
            // ...
            // 详见1.9
            handleReceiver((ReceiverData)msg.obj);
            Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);
            break;
    // ...
}

1.9 ActivityThread.handleReceiver
这里通过反射构造出对应BroadcastReceiver,然后调用了Receiver的onReceive,后续就是我们应用开发时候定义广播接收器,复写onReceive自己定义的逻辑了。

private void handleReceiver(ReceiverData data) {
    // ...
    try {
        // ...
        java.lang.ClassLoader cl = context.getClassLoader();
        data.intent.setExtrasClassLoader(cl);
        data.intent.prepareToEnterProcess(
                isProtectedComponent(data.info) || isProtectedBroadcast(data.intent),
                context.getAttributionSource());
        data.setExtrasClassLoader(cl);
        // 通过反射构造对应是BroadcastReceiver
        receiver = packageInfo.getAppFactory()
                .instantiateReceiver(cl, data.info.name, data.intent);
    } catch (Exception e) {
        // ...
    }

    try {
        // ...
        // 调用广播的onReceive
        sCurrentBroadcastIntent.set(data.intent);
        receiver.setPendingResult(data);
        receiver.onReceive(context.getReceiverRestrictedContext(),
                data.intent);
    } catch (Exception e) {
        // ...
    } finally {
        sCurrentBroadcastIntent.set(null);
    }

    if (receiver.getPendingResult() != null) {
        data.finish();
    }
}

小结

本章主要介绍了一下广播分发的流程,虽然有一些方法比较长,但是核心的逻辑是比较简单的,主要是一些分发的策略逻辑比较多。
发送广播时会把广播数据放到队列中,然后分发广播会不断调用scheduleBroadcastsLocked来处理广播的分发。
广播分发分为无序广播和有序广播,无序广播分发逻辑比较简单,直接分发即可。而有序广播有一些额外策略,例如如果有处理广播时间较长的情况,如目标进程未启动需要现在启动进程,会将广播单独拆分开,防止单个应用处理广播太慢广播影响其他应用。
分发到应用通过ActivityThread.scheduleReceiver的binder调用通知到应用,而应用会通过反射构造对应的BroadcastReceiver,然后调用它的onReceiver,之后就是app侧复写onReceive的逻辑了,这里的其实和Activity的onCreate操作很像。


http://www.kler.cn/a/313171.html

相关文章:

  • 高效稳定!新加坡服务器托管方案助力企业全球化布局
  • PNG图片批量压缩exe工具+功能纯净+不改变原始尺寸
  • Spring高手之路26——全方位掌握事务监听器
  • 比ChatGPT更酷的AI工具
  • 大厂的 404 页面都长啥样?看看你都见过吗~~~
  • ubuntu-desktop-24.04上手指南(更新阿里源、安装ssh、安装chrome、设置固定IP、安装搜狗输入法)
  • Vue3:reactive丢失响应式,数据有更新但表单没有更新
  • gin配置swagger文档
  • 树与图的深度优先遍历(dfs的图论中的应用)
  • 【CPP】类与继承
  • [原创]全新安装最新版Delphi 12.2之前, 如何正确卸载旧版Delphi 12.1?
  • 谈对象第二弹: C++类和对象(中)
  • SQLiteHelper
  • Java:List<String> 转换List<BigDecimal> 并求和
  • Hadoop-MapReduce的 原理 | 块和片 | Shuffle 过程 | Combiner
  • go 战略
  • Observability:构建下一代托管接入服务
  • Linux文件IO(四)-返回错误处理与errno详解
  • 【数据结构与算法】LeetCode:双指针法
  • 基于STM32F103C8T6单片机的DDS信号源设计
  • 海洋大地测量基准与水下导航系列之二国外海底大地测量基准和海底观测网络发展现状(上)
  • mac中git操作账号的删除
  • 【linux】4张卡,坏了1张,怎么办?
  • ActivityManagerService Activity的启动流程(2)
  • Windows10、CentOS Stream9 环境下安装kafka_2.12-3.6.2记录
  • Oracle数据库pl/sql显式抛出异常