当前位置: 首页 > article >正文

WorkManager

使用 WorkManager API 可以轻松地调度那些必须可靠运行的可延期异步任务。通过这些 API,您可以创建任务并提交给 WorkManager,以便在满足工作约束条件时运行。

一、使用

1.1 基本使用

定义任务:

public class MainWorker1 extends Worker {

    private static final String TAG = MainWorker1.class.getSimpleName();

	// 任务参数
    private WorkerParameters workerParameters;

    public MainWorker1(@NonNull Context context, @NonNull WorkerParameters workerParams) {
        super(context, workerParams);
        workerParameters = workerParams;
    }

    // 执行异步的后台任务,Result 表示结果
    @SuppressLint("RestrictedApi")
    @NonNull
    @Override
    public Result doWork() {
    	// 获取给当前任务输入的参数
        String value = workerParameters.getInputData().getString("key");
        Log.d(TAG, "doWork: " + value);
        // 任务对外输出的参数封装在 Result 中返回
        Data outputData = new Data.Builder()
                .putString("result", "result")
                .build();
        return new Result.Success(outputData);
    }
}

任务的使用:

	public void onTest1Click(View view) {
		// 输入给任务的参数
        Data inputData = new Data.Builder()
                .putString("key", "value111")
                .build();

        OneTimeWorkRequest request = new OneTimeWorkRequest.Builder(MainWorker1.class)
                .setInputData(inputData)
                .build();

		// 获取 WorkInfo,包括数据和运行状态
        WorkManager.getInstance(this).getWorkInfoByIdLiveData(request.getId())
                .observe(this, new Observer<WorkInfo>() {
                    @Override
                    public void onChanged(WorkInfo workInfo) {
                        // 获取 Worker 返回的数据
                        Log.d(TAG, "任务回传的数据: " + workInfo.getOutputData().getString("result"));

                        // 输出 Worker 的工作状态:ENQUEUED、RUNNING、SUCCEEDED 以及结束,此外还有
                        // FAILED,BLOCKED,CANCELLED
                        Log.d(TAG, "onChanged: " + workInfo.getState().name());

                        if (workInfo.getState().isFinished()) {
                            Log.d(TAG, "onChanged: 任务结束!");
                        }
                    }
                });
		
		// 任务入队
        WorkManager.getInstance(this).enqueue(request);
    }

输出 log:

D/MainWorker1: doWork: value111
D/BaseActivity: 任务回传的数据: null
D/BaseActivity: onChanged: ENQUEUED
D/BaseActivity: 任务回传的数据: null
D/BaseActivity: onChanged: RUNNING
D/BaseActivity: 任务回传的数据: result
D/BaseActivity: onChanged: SUCCEEDED
D/BaseActivity: onChanged: 任务结束!

可以看到 Worker 回传的结果数据在其处于 RUNNING 状态之后才会生成。

1.2 任务组合

	WorkManager.getInstance(myContext)
	   // Candidates to run in parallel
	   .beginWith(Arrays.asList(plantName1, plantName2, plantName3))
	   // Dependent work (only runs after all previous work in chain)
	   .then(cache)
	   .then(upload)
	   // Call enqueue to kick things off
	   .enqueue();

1.3 周期任务

	public void onTestPeriodicWorkRequest(View view) {
		// 即便你设置了 10 秒中执行一次周期任务,但是 Google 对此进行了严格的限制,周期任务的执行间隔为 15 分钟
        PeriodicWorkRequest periodicWorkRequest = new PeriodicWorkRequest.Builder(MainWorker1.class,
                10, TimeUnit.SECONDS).build();
        WorkManager.getInstance(this).getWorkInfoByIdLiveData(periodicWorkRequest.getId())
                .observe(this, new Observer<WorkInfo>() {
                    @Override
                    public void onChanged(WorkInfo workInfo) {
                        Log.d(TAG, "任务状态: " + workInfo.getState().name());
                        if (workInfo.getState().isFinished()) {
                            Log.d(TAG, "任务结束!");
                        }
                    }
                });
        WorkManager.getInstance(this).enqueue(periodicWorkRequest);
    }

周期任务要注意两点:

  1. 如果设置的 PeriodicWorkRequest 的执行间隔小于 15 分钟,系统会自动将这个间隔设置为 15 分钟。
  2. 由于是循环式的执行任务,所以输出的任务状态是 ENQUEUED、RUNNING 然后执行任务,过了 15 分钟之后循环上述状态,不会输出 SUCCESS 状态。

1.4 约束条件

	@RequiresApi(api = Build.VERSION_CODES.M)
    public void testBackgroundWork5(View view) {
        // 约束条件,必须满足我的条件,才能执行后台任务 (在连接网络,插入电源 并且 处于空闲时)
        Constraints constraints = new Constraints.Builder()
                .setRequiredNetworkType(NetworkType.CONNECTED) // 网络链接中...
                .setRequiresCharging(true) // 充电中..
                .setRequiresDeviceIdle(true) // 空闲时.. 
                .build();

        /**
         * 除了上面设置的约束外,WorkManger还提供了以下的约束作为Work执行的条件:
         *  setRequiredNetworkType:网络连接设置
         *  setRequiresBatteryNotLow:是否为低电量时运行 默认false
         *  setRequiresCharging:是否要插入设备(接入电源),默认false
         *  setRequiresDeviceIdle:设备是否为空闲,默认false
         *  setRequiresStorageNotLow:设备可用存储是否不低于临界阈值
         */

        // 请求对象
        OneTimeWorkRequest request = new OneTimeWorkRequest.Builder(MainWorker3.class)
                .setConstraints(constraints) // Request关联 约束条件
                .build();

        // 加入队列
        WorkManager.getInstance(this).enqueue(request);
    }

任务的所有信息都会被保存到 Room 数据库,所以应用提交任务后被卸载,只要满足该任务条件也会执行这个任务。使得任务可以脱离应用本身。

二、原理

2.1 初始化过程

使用时都是通过 WorkManager.getInstance() 先拿到一个 WorkManager 实例然后再将任务入队:

	public static @NonNull WorkManager getInstance(@NonNull Context context) {
        return WorkManagerImpl.getInstance(context);
    }

WorkManagerImpl 会返回一个单例对象:

	@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
    public static @NonNull WorkManagerImpl getInstance(@NonNull Context context) {
        synchronized (sLock) {
            WorkManagerImpl instance = getInstance();
            if (instance == null) {
                Context appContext = context.getApplicationContext();
                // 创建 instance 所需的 appContext 必须实现了 Configuration.Provider 接口
                if (appContext instanceof Configuration.Provider) {
                    initialize(
                            appContext,
                            ((Configuration.Provider) appContext).getWorkManagerConfiguration());
                    instance = getInstance(appContext);
                } else {
                    throw new IllegalStateException("WorkManager is not initialized properly.  You "
                            + "have explicitly disabled WorkManagerInitializer in your manifest, "
                            + "have not manually called WorkManager#initialize at this point, and "
                            + "your Application does not implement Configuration.Provider.");
                }
            }

            return instance;
        }
    }

这个初始化过程实际上涉及 WorkManagerImpl 中的两个单例:sDefaultInstance 和 sDelegatedInstance。首先看空参的 getInstance(),如果 sDelegatedInstance 不为空就返回它,否则返回 sDefaultInstance:

	
	@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
    public static @Nullable WorkManagerImpl getInstance() {
        synchronized (sLock) {
            if (sDelegatedInstance != null) {
                return sDelegatedInstance;
            }

            return sDefaultInstance;
        }
    }

如果该方法返回了 null 则需要通过 initialize() 进行初始化,不过有个前提就是这里用到的 ApplicationContext 必须实现了 Configuration.Provider 接口:

	@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
    public static void initialize(@NonNull Context context, @NonNull Configuration configuration) {
        synchronized (sLock) {
            if (sDelegatedInstance != null && sDefaultInstance != null) {
                throw new IllegalStateException("WorkManager is already initialized.  Did you "
                        + "try to initialize it manually without disabling "
                        + "WorkManagerInitializer? See "
                        + "WorkManager#initialize(Context, Configuration) or the class level "
                        + "Javadoc for more information.");
            }

            if (sDelegatedInstance == null) {
                context = context.getApplicationContext();
                if (sDefaultInstance == null) {
                    sDefaultInstance = new WorkManagerImpl(
                            context,
                            configuration,
                            new WorkManagerTaskExecutor(configuration.getTaskExecutor()));
                }
                sDelegatedInstance = sDefaultInstance;
            }
        }
    }

实际上,执行这个初始化过程的并不是我们在使用时执行的,而是由 WorkManager 内部的 WorkManagerInitializer 这个 ContentProvider 来做的。在 apk 文件的 AndroidManifest 中能找到其声明:

		<provider
            android:name="androidx.work.impl.WorkManagerInitializer"
            android:exported="false"
            android:multiprocess="true"
            android:authorities="com.demo.workmanager.workmanager-init"
            android:directBootAware="false" />

第一次执行初始化是在 WorkManagerInitializer 的 onCreate() 中:

@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class WorkManagerInitializer extends ContentProvider {
    @Override
    public boolean onCreate() {
    	// 实际上执行 WorkManagerImpl.initialize()
        // Initialize WorkManager with the default configuration.
        WorkManager.initialize(getContext(), new Configuration.Builder().build());
        return true;
    }
}

然后我们再来看看 WorkManagerImpl 初始化的具体内容:

	@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
    public WorkManagerImpl(
            @NonNull Context context,
            @NonNull Configuration configuration,
            @NonNull TaskExecutor workTaskExecutor,
            @NonNull WorkDatabase database) {
        Context applicationContext = context.getApplicationContext();
        Logger.setLogger(new Logger.LogcatLogger(configuration.getMinimumLoggingLevel()));
        // 创建 Scheduler,其中一个是 GreedyScheduler —— 贪婪执行器,会马上执行任务
        List<Scheduler> schedulers = createSchedulers(applicationContext, workTaskExecutor);
        Processor processor = new Processor(
                context,
                configuration,
                workTaskExecutor,
                database,
                schedulers);
        internalInit(context, configuration, workTaskExecutor, database, schedulers, processor);
    }

	private void internalInit(@NonNull Context context,
            @NonNull Configuration configuration,
            @NonNull TaskExecutor workTaskExecutor,
            @NonNull WorkDatabase workDatabase,
            @NonNull List<Scheduler> schedulers,
            @NonNull Processor processor) {

        context = context.getApplicationContext();
        mContext = context;
        // 保存配置,包括线程池任务等
        mConfiguration = configuration;
        mWorkTaskExecutor = workTaskExecutor;
        // 保存了 Room 数据库,用来保存任务
        mWorkDatabase = workDatabase;
        mSchedulers = schedulers;
        mProcessor = processor;
        mPreferenceUtils = new PreferenceUtils(workDatabase);
        mForceStopRunnableCompleted = false;

        // Checks for app force stops.
        mWorkTaskExecutor.executeOnBackgroundThread(new ForceStopRunnable(context, this));
    }

2.2 入队过程

我们调用 WorkManagerImpl 的 enqueue() 将任务 WorkRequest 入队时,会先交给 WorkContinuationImpl:

	@Override
    @NonNull
    public Operation enqueue(
            @NonNull List<? extends WorkRequest> workRequests) {

        // This error is not being propagated as part of the Operation, as we want the
        // app to crash during development. Having no workRequests is always a developer error.
        if (workRequests.isEmpty()) {
            throw new IllegalArgumentException(
                    "enqueue needs at least one WorkRequest.");
        }
        return new WorkContinuationImpl(this, workRequests).enqueue();
    }

先创建一个 WorkContinuationImpl:

	WorkContinuationImpl(@NonNull WorkManagerImpl workManagerImpl,
            String name,
            ExistingWorkPolicy existingWorkPolicy,
            @NonNull List<? extends WorkRequest> work,
            @Nullable List<WorkContinuationImpl> parents) {
		// 保存 WorkManagerImpl
        mWorkManagerImpl = workManagerImpl;
        mName = name;
        // 入口构造方法给的 ExistingWorkPolicy.KEEP
        mExistingWorkPolicy = existingWorkPolicy;
        // 保存 WorkRequest 列表
        mWork = work;
        // 入口构造方法给的 null
        mParents = parents;
        mIds = new ArrayList<>(mWork.size());
        mAllIds = new ArrayList<>();
        if (parents != null) {
            for (WorkContinuationImpl parent : parents) {
                mAllIds.addAll(parent.mAllIds);
            }
        }
        for (int i = 0; i < work.size(); i++) {
            String id = work.get(i).getStringId();
            mIds.add(id);
            mAllIds.add(id);
        }
    }

接下来执行 WorkContinuationImpl 的 enqueue():

	@Override
    public @NonNull Operation enqueue() {
        // Only enqueue if not already enqueued.
        if (!mEnqueued) {
        	// 使用 Configuration 中默认的线程池的 SerialExecutor 执行 EnqueueRunnable
        	// 任务,并且获得其中的 mOperation 对象,然后返回
            // The runnable walks the hierarchy of the continuations
            // and marks them enqueued using the markEnqueued() method, parent first.
            EnqueueRunnable runnable = new EnqueueRunnable(this);
            mWorkManagerImpl.getWorkTaskExecutor().executeOnBackgroundThread(runnable);
            mOperation = runnable.getOperation();
        } else {
            Logger.get().warning(TAG,
                    String.format("Already enqueued work ids (%s)", TextUtils.join(", ", mIds)));
        }
        return mOperation;
    }

而 EnqueueRunnable 会执行任务:

@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class EnqueueRunnable implements Runnable {

    private final WorkContinuationImpl mWorkContinuation;
    private final OperationImpl mOperation;

    public EnqueueRunnable(@NonNull WorkContinuationImpl workContinuation) {
        mWorkContinuation = workContinuation;
        mOperation = new OperationImpl();
    }
    
    @Override
    public void run() {
        try {
        	// 检测是否有环
            if (mWorkContinuation.hasCycles()) {
                throw new IllegalStateException(
                        String.format("WorkContinuation has cycles (%s)", mWorkContinuation));
            }
            // 
            boolean needsScheduling = addToDatabase();
            if (needsScheduling) {
                // Enable RescheduleReceiver, only when there are Worker's that need scheduling.
                final Context context =
                        mWorkContinuation.getWorkManagerImpl().getApplicationContext();
                PackageManagerHelper.setComponentEnabled(context, RescheduleReceiver.class, true);
                // 执行任务调度
                scheduleWorkInBackground();
            }
            mOperation.setState(Operation.SUCCESS);
        } catch (Throwable exception) {
            mOperation.setState(new Operation.State.FAILURE(exception));
        }
    }
}

addToDatabase():

	@VisibleForTesting
    public boolean addToDatabase() {
        WorkManagerImpl workManagerImpl = mWorkContinuation.getWorkManagerImpl();
        WorkDatabase workDatabase = workManagerImpl.getWorkDatabase();
        workDatabase.beginTransaction();
        try {
            boolean needsScheduling = processContinuation(mWorkContinuation);
            workDatabase.setTransactionSuccessful();
            return needsScheduling;
        } finally {
            workDatabase.endTransaction();
        }
    }

	private static boolean processContinuation(@NonNull WorkContinuationImpl workContinuation) {
        boolean needsScheduling = false;
        List<WorkContinuationImpl> parents = workContinuation.getParents();
        if (parents != null) {
            for (WorkContinuationImpl parent : parents) {
                // When chaining off a completed continuation we need to pay
                // attention to parents that may have been marked as enqueued before.
                if (!parent.isEnqueued()) {
                    needsScheduling |= processContinuation(parent);
                } else {
                    Logger.get().warning(TAG, String.format("Already enqueued work ids (%s).",
                            TextUtils.join(", ", parent.getIds())));
                }
            }
        }
        needsScheduling |= enqueueContinuation(workContinuation);
        return needsScheduling;
    }

关键是在 scheduleWorkInBackground() 中调度任务:

	@VisibleForTesting
    public void scheduleWorkInBackground() {
        WorkManagerImpl workManager = mWorkContinuation.getWorkManagerImpl();
        Schedulers.schedule(
                workManager.getConfiguration(),
                workManager.getWorkDatabase(),
                workManager.getSchedulers());
    }

	public static void schedule(
            @NonNull Configuration configuration,
            @NonNull WorkDatabase workDatabase,
            List<Scheduler> schedulers) {
        if (schedulers == null || schedulers.size() == 0) {
            return;
        }

        WorkSpecDao workSpecDao = workDatabase.workSpecDao();
        List<WorkSpec> eligibleWorkSpecs;

		// 开始数据库操作
        workDatabase.beginTransaction();
        try {
            eligibleWorkSpecs = workSpecDao.getEligibleWorkForScheduling(
                    configuration.getMaxSchedulerLimit());
            if (eligibleWorkSpecs != null && eligibleWorkSpecs.size() > 0) {
                long now = System.currentTimeMillis();

				// 遍历队列中所有的任务,添加到数据库中
                // Mark all the WorkSpecs as scheduled.
                // Calls to Scheduler#schedule() could potentially result in more schedules
                // on a separate thread. Therefore, this needs to be done first.
                for (WorkSpec workSpec : eligibleWorkSpecs) {
                    workSpecDao.markWorkSpecScheduled(workSpec.id, now);
                }
            }
            workDatabase.setTransactionSuccessful();
        } finally {
            workDatabase.endTransaction();
        }

        if (eligibleWorkSpecs != null && eligibleWorkSpecs.size() > 0) {
            WorkSpec[] eligibleWorkSpecsArray = eligibleWorkSpecs.toArray(new WorkSpec[0]);
            // 交给 GreedyScheduler 执行任务
            // Delegate to the underlying scheduler.
            for (Scheduler scheduler : schedulers) {
                scheduler.schedule(eligibleWorkSpecsArray);
            }
        }
    }

来到 GreedyScheduler 内:

	@Override
    public void schedule(@NonNull WorkSpec... workSpecs) {
        if (mIsMainProcess == null) {
            // The default process name is the package name.
            mIsMainProcess = TextUtils.equals(mContext.getPackageName(), getProcessName());
        }

        if (!mIsMainProcess) {
            Logger.get().info(TAG, "Ignoring schedule request in non-main process");
            return;
        }

        registerExecutionListenerIfNeeded();

        // Keep track of the list of new WorkSpecs whose constraints need to be tracked.
        // Add them to the known list of constrained WorkSpecs and call replace() on
        // WorkConstraintsTracker. That way we only need to synchronize on the part where we
        // are updating mConstrainedWorkSpecs.
        List<WorkSpec> constrainedWorkSpecs = new ArrayList<>();
        List<String> constrainedWorkSpecIds = new ArrayList<>();
        for (WorkSpec workSpec : workSpecs) {
            if (workSpec.state == WorkInfo.State.ENQUEUED
                    && !workSpec.isPeriodic()
                    && workSpec.initialDelay == 0L
                    && !workSpec.isBackedOff()) {
                // 有约束条件
                if (workSpec.hasConstraints()) {
                    if (SDK_INT >= 23 && workSpec.constraints.requiresDeviceIdle()) {
                        // Ignore requests that have an idle mode constraint.
                        Logger.get().debug(TAG,
                                String.format("Ignoring WorkSpec %s, Requires device idle.",
                                        workSpec));
                    } else if (SDK_INT >= 24 && workSpec.constraints.hasContentUriTriggers()) {
                        // Ignore requests that have content uri triggers.
                        Logger.get().debug(TAG,
                                String.format("Ignoring WorkSpec %s, Requires ContentUri triggers.",
                                        workSpec));
                    } else {
                        constrainedWorkSpecs.add(workSpec);
                        constrainedWorkSpecIds.add(workSpec.id);
                    }
                // 没有约束条件
                } else {
                    Logger.get().debug(TAG, String.format("Starting work for %s", workSpec.id));
                    mWorkManagerImpl.startWork(workSpec.id);
                }
            }
        }

        // onExecuted() which is called on the main thread also modifies the list of mConstrained
        // WorkSpecs. Therefore we need to lock here.
        synchronized (mLock) {
            if (!constrainedWorkSpecs.isEmpty()) {
                Logger.get().debug(TAG, String.format("Starting tracking for [%s]",
                        TextUtils.join(",", constrainedWorkSpecIds)));
                mConstrainedWorkSpecs.addAll(constrainedWorkSpecs);
                mWorkConstraintsTracker.replace(mConstrainedWorkSpecs);
            }
        }
    }

没有约束条件的情况下会把任务丢给 WorkManagerImpl 执行:

	@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
    public void startWork(@NonNull String workSpecId) {
        startWork(workSpecId, null);
    }
    
    public void startWork(
            @NonNull String workSpecId,
            @Nullable WorkerParameters.RuntimeExtras runtimeExtras) {
        mWorkTaskExecutor
                .executeOnBackgroundThread(
                        new StartWorkRunnable(this, workSpecId, runtimeExtras));
    }

StartWorkRunnable:

@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class StartWorkRunnable implements Runnable {

    private WorkManagerImpl mWorkManagerImpl;
    private String mWorkSpecId;
    private WorkerParameters.RuntimeExtras mRuntimeExtras;

    public StartWorkRunnable(
            WorkManagerImpl workManagerImpl,
            String workSpecId,
            WorkerParameters.RuntimeExtras runtimeExtras) {
        mWorkManagerImpl = workManagerImpl;
        mWorkSpecId = workSpecId;
        mRuntimeExtras = runtimeExtras;
    }

    @Override
    public void run() {
    	// Processor 
        mWorkManagerImpl.getProcessor().startWork(mWorkSpecId, mRuntimeExtras);
    }
}

执行任务:

	public boolean startWork(
            @NonNull String id,
            @Nullable WorkerParameters.RuntimeExtras runtimeExtras) {

        WorkerWrapper workWrapper;
        synchronized (mLock) {
            // Work may get triggered multiple times if they have passing constraints
            // and new work with those constraints are added.
            if (mEnqueuedWorkMap.containsKey(id)) {
                Logger.get().debug(
                        TAG,
                        String.format("Work %s is already enqueued for processing", id));
                return false;
            }

			// Worker 的包装对象
            workWrapper =
                    new WorkerWrapper.Builder(
                            mAppContext,
                            mConfiguration,
                            mWorkTaskExecutor,
                            this,
                            mWorkDatabase,
                            id)
                            .withSchedulers(mSchedulers)
                            .withRuntimeExtras(runtimeExtras)
                            .build();
            ListenableFuture<Boolean> future = workWrapper.getFuture();
            future.addListener(
                    new FutureListener(this, id, future),
                    mWorkTaskExecutor.getMainThreadExecutor());
            mEnqueuedWorkMap.put(id, workWrapper);
        }
        // 执行任务
        mWorkTaskExecutor.getBackgroundExecutor().execute(workWrapper);
        Logger.get().debug(TAG, String.format("%s: processing %s", getClass().getSimpleName(), id));
        return true;
    }

WorkerWrapper 是一个 Runnable:

	@WorkerThread
    @Override
    public void run() {
        mTags = mWorkTagDao.getTagsForWorkSpecId(mWorkSpecId);
        mWorkDescription = createWorkDescription(mTags);
        runWorker();
    }

	private void runWorker() {
        if (tryCheckForInterruptionAndResolve()) {
            return;
        }

        mWorkDatabase.beginTransaction();
        try {
            mWorkSpec = mWorkSpecDao.getWorkSpec(mWorkSpecId);
            if (mWorkSpec == null) {
                Logger.get().error(
                        TAG,
                        String.format("Didn't find WorkSpec for id %s", mWorkSpecId));
                resolve(false);
                return;
            }

            // Do a quick check to make sure we don't need to bail out in case this work is already
            // running, finished, or is blocked.
            if (mWorkSpec.state != ENQUEUED) {
                resolveIncorrectStatus();
                mWorkDatabase.setTransactionSuccessful();
                Logger.get().debug(TAG,
                        String.format("%s is not in ENQUEUED state. Nothing more to do.",
                                mWorkSpec.workerClassName));
                return;
            }

            // Case 1:
            // Ensure that Workers that are backed off are only executed when they are supposed to.
            // GreedyScheduler can schedule WorkSpecs that have already been backed off because
            // it is holding on to snapshots of WorkSpecs. So WorkerWrapper needs to determine
            // if the ListenableWorker is actually eligible to execute at this point in time.

            // Case 2:
            // On API 23, we double scheduler Workers because JobScheduler prefers batching.
            // So is the Work is periodic, we only need to execute it once per interval.
            // Also potential bugs in the platform may cause a Job to run more than once.

            if (mWorkSpec.isPeriodic() || mWorkSpec.isBackedOff()) {
                long now = System.currentTimeMillis();
                // Allow first run of a PeriodicWorkRequest
                // to go through. This is because when periodStartTime=0;
                // calculateNextRunTime() always > now.
                // For more information refer to b/124274584
                boolean isFirstRun = mWorkSpec.periodStartTime == 0;
                if (!isFirstRun && now < mWorkSpec.calculateNextRunTime()) {
                    Logger.get().debug(TAG,
                            String.format(
                                    "Delaying execution for %s because it is being executed "
                                            + "before schedule.",
                                    mWorkSpec.workerClassName));
                    // For AlarmManager implementation we need to reschedule this kind  of Work.
                    // This is not a problem for JobScheduler because we will only reschedule
                    // work if JobScheduler is unaware of a jobId.
                    resolve(true);
                    return;
                }
            }

            // Needed for nested transactions, such as when we're in a dependent work request when
            // using a SynchronousExecutor.
            mWorkDatabase.setTransactionSuccessful();
        } finally {
            mWorkDatabase.endTransaction();
        }

        // Merge inputs.  This can be potentially expensive code, so this should not be done inside
        // a database transaction.
        Data input;
        if (mWorkSpec.isPeriodic()) {
            input = mWorkSpec.input;
        } else {
            InputMergerFactory inputMergerFactory = mConfiguration.getInputMergerFactory();
            String inputMergerClassName = mWorkSpec.inputMergerClassName;
            InputMerger inputMerger =
                    inputMergerFactory.createInputMergerWithDefaultFallback(inputMergerClassName);
            if (inputMerger == null) {
                Logger.get().error(TAG, String.format("Could not create Input Merger %s",
                        mWorkSpec.inputMergerClassName));
                setFailedAndResolve();
                return;
            }
            List<Data> inputs = new ArrayList<>();
            inputs.add(mWorkSpec.input);
            inputs.addAll(mWorkSpecDao.getInputsFromPrerequisites(mWorkSpecId));
            input = inputMerger.merge(inputs);
        }

        WorkerParameters params = new WorkerParameters(
                UUID.fromString(mWorkSpecId),
                input,
                mTags,
                mRuntimeExtras,
                mWorkSpec.runAttemptCount,
                mConfiguration.getExecutor(),
                mWorkTaskExecutor,
                mConfiguration.getWorkerFactory(),
                new WorkProgressUpdater(mWorkDatabase, mWorkTaskExecutor),
                new WorkForegroundUpdater(mForegroundProcessor, mWorkTaskExecutor));

        // Not always creating a worker here, as the WorkerWrapper.Builder can set a worker override
        // in test mode.
        if (mWorker == null) {
            mWorker = mConfiguration.getWorkerFactory().createWorkerWithDefaultFallback(
                    mAppContext,
                    mWorkSpec.workerClassName,
                    params);
        }

        if (mWorker == null) {
            Logger.get().error(TAG,
                    String.format("Could not create Worker %s", mWorkSpec.workerClassName));
            setFailedAndResolve();
            return;
        }

        if (mWorker.isUsed()) {
            Logger.get().error(TAG,
                    String.format("Received an already-used Worker %s; WorkerFactory should return "
                            + "new instances",
                            mWorkSpec.workerClassName));
            setFailedAndResolve();
            return;
        }
        mWorker.setUsed();

        // Try to set the work to the running state.  Note that this may fail because another thread
        // may have modified the DB since we checked last at the top of this function.
        if (trySetRunning()) {
            if (tryCheckForInterruptionAndResolve()) {
                return;
            }

            final SettableFuture<ListenableWorker.Result> future = SettableFuture.create();
            // Call mWorker.startWork() on the main thread.
            mWorkTaskExecutor.getMainThreadExecutor()
                    .execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                Logger.get().debug(TAG, String.format("Starting work for %s",
                                        mWorkSpec.workerClassName));
                                // 执行 Worker 中的任务
                                mInnerFuture = mWorker.startWork();
                                future.setFuture(mInnerFuture);
                            } catch (Throwable e) {
                                future.setException(e);
                            }

                        }
                    });

            // Avoid synthetic accessors.
            final String workDescription = mWorkDescription;
            future.addListener(new Runnable() {
                @Override
                @SuppressLint("SyntheticAccessor")
                public void run() {
                    try {
                        // If the ListenableWorker returns a null result treat it as a failure.
                        ListenableWorker.Result result = future.get();
                        if (result == null) {
                            Logger.get().error(TAG, String.format(
                                    "%s returned a null result. Treating it as a failure.",
                                    mWorkSpec.workerClassName));
                        } else {
                            Logger.get().debug(TAG, String.format("%s returned a %s result.",
                                    mWorkSpec.workerClassName, result));
                            mResult = result;
                        }
                    } catch (CancellationException exception) {
                        // Cancellations need to be treated with care here because innerFuture
                        // cancellations will bubble up, and we need to gracefully handle that.
                        Logger.get().info(TAG, String.format("%s was cancelled", workDescription),
                                exception);
                    } catch (InterruptedException | ExecutionException exception) {
                        Logger.get().error(TAG,
                                String.format("%s failed because it threw an exception/error",
                                        workDescription), exception);
                    } finally {
                        onWorkFinished();
                    }
                }
            }, mWorkTaskExecutor.getBackgroundExecutor());
        } else {
            resolveIncorrectStatus();
        }
    }

最后在 Worker 的 startWork() 中会调用我们重写的 doWork():

	@Override
    public final @NonNull ListenableFuture<Result> startWork() {
        mFuture = SettableFuture.create();
        getBackgroundExecutor().execute(new Runnable() {
            @Override
            public void run() {
                try {
                	// 在线程池内执行 doWork() 并获取结果
                    Result result = doWork();
                    mFuture.set(result);
                } catch (Throwable throwable) {
                    mFuture.setException(throwable);
                }

            }
        });
        return mFuture;
    }

2.3 条件执行

假如一个任务需要网络作为约束条件,在将任务入队后还未执行前断开网络,过一阵子后连接网络,该任务会被立即执行。其实现原理如下图:

WorkManager 内部有一个广播接收者 ConstraintProxy,当网络由断开变成连接时,约束条件发生变化,ConstraintProxy 会收到广播:

abstract class ConstraintProxy extends BroadcastReceiver {

    @Override
    public void onReceive(Context context, Intent intent) {
        Intent constraintChangedIntent = CommandHandler.createConstraintsChangedIntent(context);
        context.startService(constraintChangedIntent);
    }
}
-----------------------------------------------------------------
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class CommandHandler implements ExecutionListener {
	static Intent createConstraintsChangedIntent(@NonNull Context context) {
		// SystemAlarmService 是系统级别的服务,不会被干掉
        Intent intent = new Intent(context, SystemAlarmService.class);
        intent.setAction(ACTION_CONSTRAINTS_CHANGED);
        return intent;
    }
}

onReceive() 会带着 ACTION_CONSTRAINTS_CHANGED 启动 SystemAlarmService:

@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class SystemAlarmService extends LifecycleService
        implements SystemAlarmDispatcher.CommandsCompletedListener {
	
	@Override
    public void onCreate() {
        super.onCreate();
        initializeDispatcher();
        mIsShutdown = false;
    }

	@Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        super.onStartCommand(intent, flags, startId);
        if (mIsShutdown) {
            Logger.get().info(TAG,
                    "Re-initializing SystemAlarmDispatcher after a request to shut-down.");

            // Destroy the old dispatcher to complete it's lifecycle.
            mDispatcher.onDestroy();
            // Create a new dispatcher to setup a new lifecycle.
            initializeDispatcher();
            // Set mIsShutdown to false, to correctly accept new commands.
            mIsShutdown = false;
        }

        if (intent != null) {
        	// Intent 添加到 mDispatcher
            mDispatcher.add(intent, startId);
        }

        // If the service were to crash, we want all unacknowledged Intents to get redelivered.
        return Service.START_REDELIVER_INTENT;
    }
}

在 SystemAlarmDispatcher 中会处理相应的指令:

	@MainThread
    public boolean add(@NonNull final Intent intent, final int startId) {
        ...

        intent.putExtra(KEY_START_ID, startId);
        synchronized (mIntents) {
            boolean hasCommands = !mIntents.isEmpty();
            mIntents.add(intent);
            if (!hasCommands) {
                // Only call processCommand if this is the first command.
                // The call to dequeueAndCheckForCompletion will process the remaining commands
                // in the order that they were added.
                processCommand();
            }
        }
        return true;
    }

processCommand() 会将处理任务再交给 CommandHandler:

	@MainThread
    @SuppressWarnings("FutureReturnValueIgnored")
    private void processCommand() {
        assertMainThread();
        PowerManager.WakeLock processCommandLock =
                WakeLocks.newWakeLock(mContext, PROCESS_COMMAND_TAG);
        try {
            processCommandLock.acquire();
            // Process commands on the background thread.
            mWorkManager.getWorkTaskExecutor().executeOnBackgroundThread(new Runnable() {
                @Override
                public void run() {
                    synchronized (mIntents) {
                        mCurrentIntent = mIntents.get(0);
                    }

                    if (mCurrentIntent != null) {
                        final String action = mCurrentIntent.getAction();
                        final int startId = mCurrentIntent.getIntExtra(KEY_START_ID,
                                DEFAULT_START_ID);
                        Logger.get().debug(TAG,
                                String.format("Processing command %s, %s", mCurrentIntent,
                                        startId));
                        final PowerManager.WakeLock wakeLock = WakeLocks.newWakeLock(
                                mContext,
                                String.format("%s (%s)", action, startId));
                        try {
                            Logger.get().debug(TAG, String.format(
                                    "Acquiring operation wake lock (%s) %s",
                                    action,
                                    wakeLock));

                            wakeLock.acquire();
                            // 专门处理所有 Intent
                            mCommandHandler.onHandleIntent(mCurrentIntent, startId,
                                    SystemAlarmDispatcher.this);
                        } catch (Throwable throwable) {
                            Logger.get().error(
                                    TAG,
                                    "Unexpected error in onHandleIntent",
                                    throwable);
                        }  finally {
                            Logger.get().debug(
                                    TAG,
                                    String.format(
                                            "Releasing operation wake lock (%s) %s",
                                            action,
                                            wakeLock));
                            wakeLock.release();
                            // Check if we have processed all commands
                            postOnMainThread(
                                    new DequeueAndCheckForCompletion(SystemAlarmDispatcher.this));
                        }
                    }
                }
            });
        } finally {
            processCommandLock.release();
        }
    }

在 CommandHandler 中真正的对 Intent 的 action 做分类处理:

	@WorkerThread
    void onHandleIntent(
            @NonNull Intent intent,
            int startId,
            @NonNull SystemAlarmDispatcher dispatcher) {

        String action = intent.getAction();

        if (ACTION_CONSTRAINTS_CHANGED.equals(action)) {
            handleConstraintsChanged(intent, startId, dispatcher);
        } else if (ACTION_RESCHEDULE.equals(action)) {
            handleReschedule(intent, startId, dispatcher);
        } else {
            Bundle extras = intent.getExtras();
            if (!hasKeys(extras, KEY_WORKSPEC_ID)) {
                Logger.get().error(TAG,
                        String.format("Invalid request for %s, requires %s.",
                                action,
                                KEY_WORKSPEC_ID));
            } else {
                if (ACTION_SCHEDULE_WORK.equals(action)) {
                    handleScheduleWorkIntent(intent, startId, dispatcher);
                } else if (ACTION_DELAY_MET.equals(action)) {
                    handleDelayMet(intent, startId, dispatcher);
                } else if (ACTION_STOP_WORK.equals(action)) {
                    handleStopWork(intent, dispatcher);
                } else if (ACTION_EXECUTION_COMPLETED.equals(action)) {
                    handleExecutionCompleted(intent, startId);
                } else {
                    Logger.get().warning(TAG, String.format("Ignoring intent %s", intent));
                }
            }
        }
    }

其中第一个 if 判断中的 ACTION_CONSTRAINTS_CHANGED 正是前面传入的,它会进入 handleConstraintsChanged():

	private void handleConstraintsChanged(
            @NonNull Intent intent, int startId,
            @NonNull SystemAlarmDispatcher dispatcher) {

        // Constraints changed command handler is synchronous. No cleanup
        // is necessary.
        ConstraintsCommandHandler changedCommandHandler =
                new ConstraintsCommandHandler(mContext, startId, dispatcher);
        changedCommandHandler.handleConstraintsChanged();
    }

ChangedCommandHandler 会把 Intent 原有的 Action ACTION_CONSTRAINTS_CHANGED 更换成 ACTION_DELAY_MET:

	@WorkerThread
    void handleConstraintsChanged() {
        List<WorkSpec> candidates = mDispatcher.getWorkManager().getWorkDatabase()
                .workSpecDao()
                .getScheduledWork();

        // Update constraint proxy to potentially disable proxies for previously
        // completed WorkSpecs.
        ConstraintProxy.updateAll(mContext, candidates);

        // This needs to be done to populate matching WorkSpec ids in every constraint controller.
        mWorkConstraintsTracker.replace(candidates);

        List<WorkSpec> eligibleWorkSpecs = new ArrayList<>(candidates.size());
        // Filter candidates should have already been scheduled.
        long now = System.currentTimeMillis();
        for (WorkSpec workSpec : candidates) {
            String workSpecId = workSpec.id;
            long triggerAt = workSpec.calculateNextRunTime();
            if (now >= triggerAt && (!workSpec.hasConstraints()
                    || mWorkConstraintsTracker.areAllConstraintsMet(workSpecId))) {
                eligibleWorkSpecs.add(workSpec);
            }
        }

        for (WorkSpec workSpec : eligibleWorkSpecs) {
            String workSpecId = workSpec.id;
            // 更换标记为 ACTION_DELAY_MET
            Intent intent = CommandHandler.createDelayMetIntent(mContext, workSpecId);
            mDispatcher.postOnMainThread(
                    new SystemAlarmDispatcher.AddRunnable(mDispatcher, intent, mStartId));
        }

        mWorkConstraintsTracker.reset();
    }

CommandHandler 创建一个 Action 为 ACTION_DELAY_MET 的 Intent:

	static Intent createDelayMetIntent(@NonNull Context context, @NonNull String workSpecId) {
        Intent intent = new Intent(context, SystemAlarmService.class);
        intent.setAction(ACTION_DELAY_MET);
        intent.putExtra(KEY_WORKSPEC_ID, workSpecId);
        return intent;
    }

再回到 CommandHandler 的 onHandleIntent(),刚好有处理 ACTION_DELAY_MET 的情况,会调用 handleDelayMet():

	private void handleDelayMet(
            @NonNull Intent intent,
            int startId,
            @NonNull SystemAlarmDispatcher dispatcher) {

        Bundle extras = intent.getExtras();
        synchronized (mLock) {
            String workSpecId = extras.getString(KEY_WORKSPEC_ID);

            // Check to see if we are already handling an ACTION_DELAY_MET for the WorkSpec.
            // If we are, then there is nothing for us to do.
            if (!mPendingDelayMet.containsKey(workSpecId)) {
                DelayMetCommandHandler delayMetCommandHandler =
                        new DelayMetCommandHandler(mContext, startId, workSpecId, dispatcher);
                mPendingDelayMet.put(workSpecId, delayMetCommandHandler);
                delayMetCommandHandler.handleProcessWork();
            } else {
                // log...
            }
        }
    }

在 DelayMetCommandHandler 的 handleProcessWork() 中,会调用 Processor 的 startWork() 最终执行到我们重写的 Worker 的 doWork():

	@WorkerThread
    void handleProcessWork() {
        ...

        if (!mHasConstraints) {
            onAllConstraintsMet(Collections.singletonList(mWorkSpecId));
        } else {
            // Allow tracker to report constraint changes
            mWorkConstraintsTracker.replace(Collections.singletonList(workSpec));
        }
    }

	@Override
    public void onAllConstraintsMet(@NonNull List<String> workSpecIds) {
        // WorkConstraintsTracker will call onAllConstraintsMet with list of workSpecs whose
        // constraints are met. Ensure the workSpecId we are interested is part of the list
        // before we call Processor#startWork().
        if (!workSpecIds.contains(mWorkSpecId)) {
            return;
        }

        synchronized (mLock) {
            if (mCurrentState == STATE_INITIAL) {
                mCurrentState = STATE_START_REQUESTED;

				// 调用 Processor 的 startWork()
                // Constraints met, schedule execution
                // Not using WorkManagerImpl#startWork() here because we need to know if the
                // processor actually enqueued the work here.
                boolean isEnqueued = mDispatcher.getProcessor().startWork(mWorkSpecId);

                if (isEnqueued) {
                    // setup timers to enforce quotas on workers that have
                    // been enqueued
                    mDispatcher.getWorkTimer()
                            .startTimer(mWorkSpecId, WORK_PROCESSING_TIME_IN_MS, this);
                } else {
                    // if we did not actually enqueue the work, it was enqueued before
                    // cleanUp and pretend this never happened.
                    cleanUp();
                }
            } else {
                Logger.get().debug(TAG, String.format("Already started work for %s", mWorkSpecId));
            }
        }
    }

参考资料:

使用 WorkManager 调度任务

WorkManager


http://www.kler.cn/a/449282.html

相关文章:

  • 网络下载ts流媒体
  • C++中的字符串实现
  • 常见数据结构
  • 门户系统需要压测吗?以及门户系统如何压力测试?
  • 法学硕士,有哪些专业可以申请呢?
  • python中的字典数据和标准json格式区别
  • Centos8安装源出错 设置基础软件仓库出错(已解决-秒解)
  • ubuntu 安装docker
  • 【枚举】假币问题
  • 《Vue进阶教程》第十六课:深入完善响应式系统之单例模式
  • 从0到1手写实现Event Emitter
  • 关于Buildroot如何配置qtwebengine [未能成功编译]
  • 面试题整理15----K8s常见的网络插件有哪些
  • 对于其他管理的理解(中)
  • 【Flink-scala】DataSet编程模型介绍及数据源
  • Pytorch | 从零构建ParNet/Non-Deep Networks对CIFAR10进行分类
  • 在FreeRTOS中动态创建任务,假如在最后一个参数写NULL,该任务有任务句柄吗
  • 安装管理docker
  • 重温设计模式--享元模式
  • 路由器做WPAD、VPN、透明代理中之间一个
  • CSS系列(24)-- 打印样式详解
  • 基于JAVA_JSP电子书下载系统的设计与实现【源码+文档+部署讲解】
  • 设计模式详解(十二):单例模式——Singleton
  • 如何注册和使用Facebook企业号
  • uniapp验证码
  • 数据库管理-第274期 Oracle Enterprise Manager 24ai新特性一览(20241223)