WorkManager
使用 WorkManager API 可以轻松地调度那些必须可靠运行的可延期异步任务。通过这些 API,您可以创建任务并提交给 WorkManager,以便在满足工作约束条件时运行。
一、使用
1.1 基本使用
定义任务:
public class MainWorker1 extends Worker {
private static final String TAG = MainWorker1.class.getSimpleName();
// 任务参数
private WorkerParameters workerParameters;
public MainWorker1(@NonNull Context context, @NonNull WorkerParameters workerParams) {
super(context, workerParams);
workerParameters = workerParams;
}
// 执行异步的后台任务,Result 表示结果
@SuppressLint("RestrictedApi")
@NonNull
@Override
public Result doWork() {
// 获取给当前任务输入的参数
String value = workerParameters.getInputData().getString("key");
Log.d(TAG, "doWork: " + value);
// 任务对外输出的参数封装在 Result 中返回
Data outputData = new Data.Builder()
.putString("result", "result")
.build();
return new Result.Success(outputData);
}
}
任务的使用:
public void onTest1Click(View view) {
// 输入给任务的参数
Data inputData = new Data.Builder()
.putString("key", "value111")
.build();
OneTimeWorkRequest request = new OneTimeWorkRequest.Builder(MainWorker1.class)
.setInputData(inputData)
.build();
// 获取 WorkInfo,包括数据和运行状态
WorkManager.getInstance(this).getWorkInfoByIdLiveData(request.getId())
.observe(this, new Observer<WorkInfo>() {
@Override
public void onChanged(WorkInfo workInfo) {
// 获取 Worker 返回的数据
Log.d(TAG, "任务回传的数据: " + workInfo.getOutputData().getString("result"));
// 输出 Worker 的工作状态:ENQUEUED、RUNNING、SUCCEEDED 以及结束,此外还有
// FAILED,BLOCKED,CANCELLED
Log.d(TAG, "onChanged: " + workInfo.getState().name());
if (workInfo.getState().isFinished()) {
Log.d(TAG, "onChanged: 任务结束!");
}
}
});
// 任务入队
WorkManager.getInstance(this).enqueue(request);
}
输出 log:
D/MainWorker1: doWork: value111
D/BaseActivity: 任务回传的数据: null
D/BaseActivity: onChanged: ENQUEUED
D/BaseActivity: 任务回传的数据: null
D/BaseActivity: onChanged: RUNNING
D/BaseActivity: 任务回传的数据: result
D/BaseActivity: onChanged: SUCCEEDED
D/BaseActivity: onChanged: 任务结束!
可以看到 Worker 回传的结果数据在其处于 RUNNING 状态之后才会生成。
1.2 任务组合
WorkManager.getInstance(myContext)
// Candidates to run in parallel
.beginWith(Arrays.asList(plantName1, plantName2, plantName3))
// Dependent work (only runs after all previous work in chain)
.then(cache)
.then(upload)
// Call enqueue to kick things off
.enqueue();
1.3 周期任务
public void onTestPeriodicWorkRequest(View view) {
// 即便你设置了 10 秒中执行一次周期任务,但是 Google 对此进行了严格的限制,周期任务的执行间隔为 15 分钟
PeriodicWorkRequest periodicWorkRequest = new PeriodicWorkRequest.Builder(MainWorker1.class,
10, TimeUnit.SECONDS).build();
WorkManager.getInstance(this).getWorkInfoByIdLiveData(periodicWorkRequest.getId())
.observe(this, new Observer<WorkInfo>() {
@Override
public void onChanged(WorkInfo workInfo) {
Log.d(TAG, "任务状态: " + workInfo.getState().name());
if (workInfo.getState().isFinished()) {
Log.d(TAG, "任务结束!");
}
}
});
WorkManager.getInstance(this).enqueue(periodicWorkRequest);
}
周期任务要注意两点:
- 如果设置的 PeriodicWorkRequest 的执行间隔小于 15 分钟,系统会自动将这个间隔设置为 15 分钟。
- 由于是循环式的执行任务,所以输出的任务状态是 ENQUEUED、RUNNING 然后执行任务,过了 15 分钟之后循环上述状态,不会输出 SUCCESS 状态。
1.4 约束条件
@RequiresApi(api = Build.VERSION_CODES.M)
public void testBackgroundWork5(View view) {
// 约束条件,必须满足我的条件,才能执行后台任务 (在连接网络,插入电源 并且 处于空闲时)
Constraints constraints = new Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED) // 网络链接中...
.setRequiresCharging(true) // 充电中..
.setRequiresDeviceIdle(true) // 空闲时..
.build();
/**
* 除了上面设置的约束外,WorkManger还提供了以下的约束作为Work执行的条件:
* setRequiredNetworkType:网络连接设置
* setRequiresBatteryNotLow:是否为低电量时运行 默认false
* setRequiresCharging:是否要插入设备(接入电源),默认false
* setRequiresDeviceIdle:设备是否为空闲,默认false
* setRequiresStorageNotLow:设备可用存储是否不低于临界阈值
*/
// 请求对象
OneTimeWorkRequest request = new OneTimeWorkRequest.Builder(MainWorker3.class)
.setConstraints(constraints) // Request关联 约束条件
.build();
// 加入队列
WorkManager.getInstance(this).enqueue(request);
}
任务的所有信息都会被保存到 Room 数据库,所以应用提交任务后被卸载,只要满足该任务条件也会执行这个任务。使得任务可以脱离应用本身。
二、原理
2.1 初始化过程
使用时都是通过 WorkManager.getInstance() 先拿到一个 WorkManager 实例然后再将任务入队:
public static @NonNull WorkManager getInstance(@NonNull Context context) {
return WorkManagerImpl.getInstance(context);
}
WorkManagerImpl 会返回一个单例对象:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public static @NonNull WorkManagerImpl getInstance(@NonNull Context context) {
synchronized (sLock) {
WorkManagerImpl instance = getInstance();
if (instance == null) {
Context appContext = context.getApplicationContext();
// 创建 instance 所需的 appContext 必须实现了 Configuration.Provider 接口
if (appContext instanceof Configuration.Provider) {
initialize(
appContext,
((Configuration.Provider) appContext).getWorkManagerConfiguration());
instance = getInstance(appContext);
} else {
throw new IllegalStateException("WorkManager is not initialized properly. You "
+ "have explicitly disabled WorkManagerInitializer in your manifest, "
+ "have not manually called WorkManager#initialize at this point, and "
+ "your Application does not implement Configuration.Provider.");
}
}
return instance;
}
}
这个初始化过程实际上涉及 WorkManagerImpl 中的两个单例:sDefaultInstance 和 sDelegatedInstance。首先看空参的 getInstance(),如果 sDelegatedInstance 不为空就返回它,否则返回 sDefaultInstance:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public static @Nullable WorkManagerImpl getInstance() {
synchronized (sLock) {
if (sDelegatedInstance != null) {
return sDelegatedInstance;
}
return sDefaultInstance;
}
}
如果该方法返回了 null 则需要通过 initialize() 进行初始化,不过有个前提就是这里用到的 ApplicationContext 必须实现了 Configuration.Provider 接口:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public static void initialize(@NonNull Context context, @NonNull Configuration configuration) {
synchronized (sLock) {
if (sDelegatedInstance != null && sDefaultInstance != null) {
throw new IllegalStateException("WorkManager is already initialized. Did you "
+ "try to initialize it manually without disabling "
+ "WorkManagerInitializer? See "
+ "WorkManager#initialize(Context, Configuration) or the class level "
+ "Javadoc for more information.");
}
if (sDelegatedInstance == null) {
context = context.getApplicationContext();
if (sDefaultInstance == null) {
sDefaultInstance = new WorkManagerImpl(
context,
configuration,
new WorkManagerTaskExecutor(configuration.getTaskExecutor()));
}
sDelegatedInstance = sDefaultInstance;
}
}
}
实际上,执行这个初始化过程的并不是我们在使用时执行的,而是由 WorkManager 内部的 WorkManagerInitializer 这个 ContentProvider 来做的。在 apk 文件的 AndroidManifest 中能找到其声明:
<provider
android:name="androidx.work.impl.WorkManagerInitializer"
android:exported="false"
android:multiprocess="true"
android:authorities="com.demo.workmanager.workmanager-init"
android:directBootAware="false" />
第一次执行初始化是在 WorkManagerInitializer 的 onCreate() 中:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class WorkManagerInitializer extends ContentProvider {
@Override
public boolean onCreate() {
// 实际上执行 WorkManagerImpl.initialize()
// Initialize WorkManager with the default configuration.
WorkManager.initialize(getContext(), new Configuration.Builder().build());
return true;
}
}
然后我们再来看看 WorkManagerImpl 初始化的具体内容:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public WorkManagerImpl(
@NonNull Context context,
@NonNull Configuration configuration,
@NonNull TaskExecutor workTaskExecutor,
@NonNull WorkDatabase database) {
Context applicationContext = context.getApplicationContext();
Logger.setLogger(new Logger.LogcatLogger(configuration.getMinimumLoggingLevel()));
// 创建 Scheduler,其中一个是 GreedyScheduler —— 贪婪执行器,会马上执行任务
List<Scheduler> schedulers = createSchedulers(applicationContext, workTaskExecutor);
Processor processor = new Processor(
context,
configuration,
workTaskExecutor,
database,
schedulers);
internalInit(context, configuration, workTaskExecutor, database, schedulers, processor);
}
private void internalInit(@NonNull Context context,
@NonNull Configuration configuration,
@NonNull TaskExecutor workTaskExecutor,
@NonNull WorkDatabase workDatabase,
@NonNull List<Scheduler> schedulers,
@NonNull Processor processor) {
context = context.getApplicationContext();
mContext = context;
// 保存配置,包括线程池任务等
mConfiguration = configuration;
mWorkTaskExecutor = workTaskExecutor;
// 保存了 Room 数据库,用来保存任务
mWorkDatabase = workDatabase;
mSchedulers = schedulers;
mProcessor = processor;
mPreferenceUtils = new PreferenceUtils(workDatabase);
mForceStopRunnableCompleted = false;
// Checks for app force stops.
mWorkTaskExecutor.executeOnBackgroundThread(new ForceStopRunnable(context, this));
}
2.2 入队过程
我们调用 WorkManagerImpl 的 enqueue() 将任务 WorkRequest 入队时,会先交给 WorkContinuationImpl:
@Override
@NonNull
public Operation enqueue(
@NonNull List<? extends WorkRequest> workRequests) {
// This error is not being propagated as part of the Operation, as we want the
// app to crash during development. Having no workRequests is always a developer error.
if (workRequests.isEmpty()) {
throw new IllegalArgumentException(
"enqueue needs at least one WorkRequest.");
}
return new WorkContinuationImpl(this, workRequests).enqueue();
}
先创建一个 WorkContinuationImpl:
WorkContinuationImpl(@NonNull WorkManagerImpl workManagerImpl,
String name,
ExistingWorkPolicy existingWorkPolicy,
@NonNull List<? extends WorkRequest> work,
@Nullable List<WorkContinuationImpl> parents) {
// 保存 WorkManagerImpl
mWorkManagerImpl = workManagerImpl;
mName = name;
// 入口构造方法给的 ExistingWorkPolicy.KEEP
mExistingWorkPolicy = existingWorkPolicy;
// 保存 WorkRequest 列表
mWork = work;
// 入口构造方法给的 null
mParents = parents;
mIds = new ArrayList<>(mWork.size());
mAllIds = new ArrayList<>();
if (parents != null) {
for (WorkContinuationImpl parent : parents) {
mAllIds.addAll(parent.mAllIds);
}
}
for (int i = 0; i < work.size(); i++) {
String id = work.get(i).getStringId();
mIds.add(id);
mAllIds.add(id);
}
}
接下来执行 WorkContinuationImpl 的 enqueue():
@Override
public @NonNull Operation enqueue() {
// Only enqueue if not already enqueued.
if (!mEnqueued) {
// 使用 Configuration 中默认的线程池的 SerialExecutor 执行 EnqueueRunnable
// 任务,并且获得其中的 mOperation 对象,然后返回
// The runnable walks the hierarchy of the continuations
// and marks them enqueued using the markEnqueued() method, parent first.
EnqueueRunnable runnable = new EnqueueRunnable(this);
mWorkManagerImpl.getWorkTaskExecutor().executeOnBackgroundThread(runnable);
mOperation = runnable.getOperation();
} else {
Logger.get().warning(TAG,
String.format("Already enqueued work ids (%s)", TextUtils.join(", ", mIds)));
}
return mOperation;
}
而 EnqueueRunnable 会执行任务:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class EnqueueRunnable implements Runnable {
private final WorkContinuationImpl mWorkContinuation;
private final OperationImpl mOperation;
public EnqueueRunnable(@NonNull WorkContinuationImpl workContinuation) {
mWorkContinuation = workContinuation;
mOperation = new OperationImpl();
}
@Override
public void run() {
try {
// 检测是否有环
if (mWorkContinuation.hasCycles()) {
throw new IllegalStateException(
String.format("WorkContinuation has cycles (%s)", mWorkContinuation));
}
//
boolean needsScheduling = addToDatabase();
if (needsScheduling) {
// Enable RescheduleReceiver, only when there are Worker's that need scheduling.
final Context context =
mWorkContinuation.getWorkManagerImpl().getApplicationContext();
PackageManagerHelper.setComponentEnabled(context, RescheduleReceiver.class, true);
// 执行任务调度
scheduleWorkInBackground();
}
mOperation.setState(Operation.SUCCESS);
} catch (Throwable exception) {
mOperation.setState(new Operation.State.FAILURE(exception));
}
}
}
addToDatabase():
@VisibleForTesting
public boolean addToDatabase() {
WorkManagerImpl workManagerImpl = mWorkContinuation.getWorkManagerImpl();
WorkDatabase workDatabase = workManagerImpl.getWorkDatabase();
workDatabase.beginTransaction();
try {
boolean needsScheduling = processContinuation(mWorkContinuation);
workDatabase.setTransactionSuccessful();
return needsScheduling;
} finally {
workDatabase.endTransaction();
}
}
private static boolean processContinuation(@NonNull WorkContinuationImpl workContinuation) {
boolean needsScheduling = false;
List<WorkContinuationImpl> parents = workContinuation.getParents();
if (parents != null) {
for (WorkContinuationImpl parent : parents) {
// When chaining off a completed continuation we need to pay
// attention to parents that may have been marked as enqueued before.
if (!parent.isEnqueued()) {
needsScheduling |= processContinuation(parent);
} else {
Logger.get().warning(TAG, String.format("Already enqueued work ids (%s).",
TextUtils.join(", ", parent.getIds())));
}
}
}
needsScheduling |= enqueueContinuation(workContinuation);
return needsScheduling;
}
关键是在 scheduleWorkInBackground() 中调度任务:
@VisibleForTesting
public void scheduleWorkInBackground() {
WorkManagerImpl workManager = mWorkContinuation.getWorkManagerImpl();
Schedulers.schedule(
workManager.getConfiguration(),
workManager.getWorkDatabase(),
workManager.getSchedulers());
}
public static void schedule(
@NonNull Configuration configuration,
@NonNull WorkDatabase workDatabase,
List<Scheduler> schedulers) {
if (schedulers == null || schedulers.size() == 0) {
return;
}
WorkSpecDao workSpecDao = workDatabase.workSpecDao();
List<WorkSpec> eligibleWorkSpecs;
// 开始数据库操作
workDatabase.beginTransaction();
try {
eligibleWorkSpecs = workSpecDao.getEligibleWorkForScheduling(
configuration.getMaxSchedulerLimit());
if (eligibleWorkSpecs != null && eligibleWorkSpecs.size() > 0) {
long now = System.currentTimeMillis();
// 遍历队列中所有的任务,添加到数据库中
// Mark all the WorkSpecs as scheduled.
// Calls to Scheduler#schedule() could potentially result in more schedules
// on a separate thread. Therefore, this needs to be done first.
for (WorkSpec workSpec : eligibleWorkSpecs) {
workSpecDao.markWorkSpecScheduled(workSpec.id, now);
}
}
workDatabase.setTransactionSuccessful();
} finally {
workDatabase.endTransaction();
}
if (eligibleWorkSpecs != null && eligibleWorkSpecs.size() > 0) {
WorkSpec[] eligibleWorkSpecsArray = eligibleWorkSpecs.toArray(new WorkSpec[0]);
// 交给 GreedyScheduler 执行任务
// Delegate to the underlying scheduler.
for (Scheduler scheduler : schedulers) {
scheduler.schedule(eligibleWorkSpecsArray);
}
}
}
来到 GreedyScheduler 内:
@Override
public void schedule(@NonNull WorkSpec... workSpecs) {
if (mIsMainProcess == null) {
// The default process name is the package name.
mIsMainProcess = TextUtils.equals(mContext.getPackageName(), getProcessName());
}
if (!mIsMainProcess) {
Logger.get().info(TAG, "Ignoring schedule request in non-main process");
return;
}
registerExecutionListenerIfNeeded();
// Keep track of the list of new WorkSpecs whose constraints need to be tracked.
// Add them to the known list of constrained WorkSpecs and call replace() on
// WorkConstraintsTracker. That way we only need to synchronize on the part where we
// are updating mConstrainedWorkSpecs.
List<WorkSpec> constrainedWorkSpecs = new ArrayList<>();
List<String> constrainedWorkSpecIds = new ArrayList<>();
for (WorkSpec workSpec : workSpecs) {
if (workSpec.state == WorkInfo.State.ENQUEUED
&& !workSpec.isPeriodic()
&& workSpec.initialDelay == 0L
&& !workSpec.isBackedOff()) {
// 有约束条件
if (workSpec.hasConstraints()) {
if (SDK_INT >= 23 && workSpec.constraints.requiresDeviceIdle()) {
// Ignore requests that have an idle mode constraint.
Logger.get().debug(TAG,
String.format("Ignoring WorkSpec %s, Requires device idle.",
workSpec));
} else if (SDK_INT >= 24 && workSpec.constraints.hasContentUriTriggers()) {
// Ignore requests that have content uri triggers.
Logger.get().debug(TAG,
String.format("Ignoring WorkSpec %s, Requires ContentUri triggers.",
workSpec));
} else {
constrainedWorkSpecs.add(workSpec);
constrainedWorkSpecIds.add(workSpec.id);
}
// 没有约束条件
} else {
Logger.get().debug(TAG, String.format("Starting work for %s", workSpec.id));
mWorkManagerImpl.startWork(workSpec.id);
}
}
}
// onExecuted() which is called on the main thread also modifies the list of mConstrained
// WorkSpecs. Therefore we need to lock here.
synchronized (mLock) {
if (!constrainedWorkSpecs.isEmpty()) {
Logger.get().debug(TAG, String.format("Starting tracking for [%s]",
TextUtils.join(",", constrainedWorkSpecIds)));
mConstrainedWorkSpecs.addAll(constrainedWorkSpecs);
mWorkConstraintsTracker.replace(mConstrainedWorkSpecs);
}
}
}
没有约束条件的情况下会把任务丢给 WorkManagerImpl 执行:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public void startWork(@NonNull String workSpecId) {
startWork(workSpecId, null);
}
public void startWork(
@NonNull String workSpecId,
@Nullable WorkerParameters.RuntimeExtras runtimeExtras) {
mWorkTaskExecutor
.executeOnBackgroundThread(
new StartWorkRunnable(this, workSpecId, runtimeExtras));
}
StartWorkRunnable:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class StartWorkRunnable implements Runnable {
private WorkManagerImpl mWorkManagerImpl;
private String mWorkSpecId;
private WorkerParameters.RuntimeExtras mRuntimeExtras;
public StartWorkRunnable(
WorkManagerImpl workManagerImpl,
String workSpecId,
WorkerParameters.RuntimeExtras runtimeExtras) {
mWorkManagerImpl = workManagerImpl;
mWorkSpecId = workSpecId;
mRuntimeExtras = runtimeExtras;
}
@Override
public void run() {
// Processor
mWorkManagerImpl.getProcessor().startWork(mWorkSpecId, mRuntimeExtras);
}
}
执行任务:
public boolean startWork(
@NonNull String id,
@Nullable WorkerParameters.RuntimeExtras runtimeExtras) {
WorkerWrapper workWrapper;
synchronized (mLock) {
// Work may get triggered multiple times if they have passing constraints
// and new work with those constraints are added.
if (mEnqueuedWorkMap.containsKey(id)) {
Logger.get().debug(
TAG,
String.format("Work %s is already enqueued for processing", id));
return false;
}
// Worker 的包装对象
workWrapper =
new WorkerWrapper.Builder(
mAppContext,
mConfiguration,
mWorkTaskExecutor,
this,
mWorkDatabase,
id)
.withSchedulers(mSchedulers)
.withRuntimeExtras(runtimeExtras)
.build();
ListenableFuture<Boolean> future = workWrapper.getFuture();
future.addListener(
new FutureListener(this, id, future),
mWorkTaskExecutor.getMainThreadExecutor());
mEnqueuedWorkMap.put(id, workWrapper);
}
// 执行任务
mWorkTaskExecutor.getBackgroundExecutor().execute(workWrapper);
Logger.get().debug(TAG, String.format("%s: processing %s", getClass().getSimpleName(), id));
return true;
}
WorkerWrapper 是一个 Runnable:
@WorkerThread
@Override
public void run() {
mTags = mWorkTagDao.getTagsForWorkSpecId(mWorkSpecId);
mWorkDescription = createWorkDescription(mTags);
runWorker();
}
private void runWorker() {
if (tryCheckForInterruptionAndResolve()) {
return;
}
mWorkDatabase.beginTransaction();
try {
mWorkSpec = mWorkSpecDao.getWorkSpec(mWorkSpecId);
if (mWorkSpec == null) {
Logger.get().error(
TAG,
String.format("Didn't find WorkSpec for id %s", mWorkSpecId));
resolve(false);
return;
}
// Do a quick check to make sure we don't need to bail out in case this work is already
// running, finished, or is blocked.
if (mWorkSpec.state != ENQUEUED) {
resolveIncorrectStatus();
mWorkDatabase.setTransactionSuccessful();
Logger.get().debug(TAG,
String.format("%s is not in ENQUEUED state. Nothing more to do.",
mWorkSpec.workerClassName));
return;
}
// Case 1:
// Ensure that Workers that are backed off are only executed when they are supposed to.
// GreedyScheduler can schedule WorkSpecs that have already been backed off because
// it is holding on to snapshots of WorkSpecs. So WorkerWrapper needs to determine
// if the ListenableWorker is actually eligible to execute at this point in time.
// Case 2:
// On API 23, we double scheduler Workers because JobScheduler prefers batching.
// So is the Work is periodic, we only need to execute it once per interval.
// Also potential bugs in the platform may cause a Job to run more than once.
if (mWorkSpec.isPeriodic() || mWorkSpec.isBackedOff()) {
long now = System.currentTimeMillis();
// Allow first run of a PeriodicWorkRequest
// to go through. This is because when periodStartTime=0;
// calculateNextRunTime() always > now.
// For more information refer to b/124274584
boolean isFirstRun = mWorkSpec.periodStartTime == 0;
if (!isFirstRun && now < mWorkSpec.calculateNextRunTime()) {
Logger.get().debug(TAG,
String.format(
"Delaying execution for %s because it is being executed "
+ "before schedule.",
mWorkSpec.workerClassName));
// For AlarmManager implementation we need to reschedule this kind of Work.
// This is not a problem for JobScheduler because we will only reschedule
// work if JobScheduler is unaware of a jobId.
resolve(true);
return;
}
}
// Needed for nested transactions, such as when we're in a dependent work request when
// using a SynchronousExecutor.
mWorkDatabase.setTransactionSuccessful();
} finally {
mWorkDatabase.endTransaction();
}
// Merge inputs. This can be potentially expensive code, so this should not be done inside
// a database transaction.
Data input;
if (mWorkSpec.isPeriodic()) {
input = mWorkSpec.input;
} else {
InputMergerFactory inputMergerFactory = mConfiguration.getInputMergerFactory();
String inputMergerClassName = mWorkSpec.inputMergerClassName;
InputMerger inputMerger =
inputMergerFactory.createInputMergerWithDefaultFallback(inputMergerClassName);
if (inputMerger == null) {
Logger.get().error(TAG, String.format("Could not create Input Merger %s",
mWorkSpec.inputMergerClassName));
setFailedAndResolve();
return;
}
List<Data> inputs = new ArrayList<>();
inputs.add(mWorkSpec.input);
inputs.addAll(mWorkSpecDao.getInputsFromPrerequisites(mWorkSpecId));
input = inputMerger.merge(inputs);
}
WorkerParameters params = new WorkerParameters(
UUID.fromString(mWorkSpecId),
input,
mTags,
mRuntimeExtras,
mWorkSpec.runAttemptCount,
mConfiguration.getExecutor(),
mWorkTaskExecutor,
mConfiguration.getWorkerFactory(),
new WorkProgressUpdater(mWorkDatabase, mWorkTaskExecutor),
new WorkForegroundUpdater(mForegroundProcessor, mWorkTaskExecutor));
// Not always creating a worker here, as the WorkerWrapper.Builder can set a worker override
// in test mode.
if (mWorker == null) {
mWorker = mConfiguration.getWorkerFactory().createWorkerWithDefaultFallback(
mAppContext,
mWorkSpec.workerClassName,
params);
}
if (mWorker == null) {
Logger.get().error(TAG,
String.format("Could not create Worker %s", mWorkSpec.workerClassName));
setFailedAndResolve();
return;
}
if (mWorker.isUsed()) {
Logger.get().error(TAG,
String.format("Received an already-used Worker %s; WorkerFactory should return "
+ "new instances",
mWorkSpec.workerClassName));
setFailedAndResolve();
return;
}
mWorker.setUsed();
// Try to set the work to the running state. Note that this may fail because another thread
// may have modified the DB since we checked last at the top of this function.
if (trySetRunning()) {
if (tryCheckForInterruptionAndResolve()) {
return;
}
final SettableFuture<ListenableWorker.Result> future = SettableFuture.create();
// Call mWorker.startWork() on the main thread.
mWorkTaskExecutor.getMainThreadExecutor()
.execute(new Runnable() {
@Override
public void run() {
try {
Logger.get().debug(TAG, String.format("Starting work for %s",
mWorkSpec.workerClassName));
// 执行 Worker 中的任务
mInnerFuture = mWorker.startWork();
future.setFuture(mInnerFuture);
} catch (Throwable e) {
future.setException(e);
}
}
});
// Avoid synthetic accessors.
final String workDescription = mWorkDescription;
future.addListener(new Runnable() {
@Override
@SuppressLint("SyntheticAccessor")
public void run() {
try {
// If the ListenableWorker returns a null result treat it as a failure.
ListenableWorker.Result result = future.get();
if (result == null) {
Logger.get().error(TAG, String.format(
"%s returned a null result. Treating it as a failure.",
mWorkSpec.workerClassName));
} else {
Logger.get().debug(TAG, String.format("%s returned a %s result.",
mWorkSpec.workerClassName, result));
mResult = result;
}
} catch (CancellationException exception) {
// Cancellations need to be treated with care here because innerFuture
// cancellations will bubble up, and we need to gracefully handle that.
Logger.get().info(TAG, String.format("%s was cancelled", workDescription),
exception);
} catch (InterruptedException | ExecutionException exception) {
Logger.get().error(TAG,
String.format("%s failed because it threw an exception/error",
workDescription), exception);
} finally {
onWorkFinished();
}
}
}, mWorkTaskExecutor.getBackgroundExecutor());
} else {
resolveIncorrectStatus();
}
}
最后在 Worker 的 startWork() 中会调用我们重写的 doWork():
@Override
public final @NonNull ListenableFuture<Result> startWork() {
mFuture = SettableFuture.create();
getBackgroundExecutor().execute(new Runnable() {
@Override
public void run() {
try {
// 在线程池内执行 doWork() 并获取结果
Result result = doWork();
mFuture.set(result);
} catch (Throwable throwable) {
mFuture.setException(throwable);
}
}
});
return mFuture;
}
2.3 条件执行
假如一个任务需要网络作为约束条件,在将任务入队后还未执行前断开网络,过一阵子后连接网络,该任务会被立即执行。其实现原理如下图:
WorkManager 内部有一个广播接收者 ConstraintProxy,当网络由断开变成连接时,约束条件发生变化,ConstraintProxy 会收到广播:
abstract class ConstraintProxy extends BroadcastReceiver {
@Override
public void onReceive(Context context, Intent intent) {
Intent constraintChangedIntent = CommandHandler.createConstraintsChangedIntent(context);
context.startService(constraintChangedIntent);
}
}
-----------------------------------------------------------------
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class CommandHandler implements ExecutionListener {
static Intent createConstraintsChangedIntent(@NonNull Context context) {
// SystemAlarmService 是系统级别的服务,不会被干掉
Intent intent = new Intent(context, SystemAlarmService.class);
intent.setAction(ACTION_CONSTRAINTS_CHANGED);
return intent;
}
}
onReceive() 会带着 ACTION_CONSTRAINTS_CHANGED 启动 SystemAlarmService:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class SystemAlarmService extends LifecycleService
implements SystemAlarmDispatcher.CommandsCompletedListener {
@Override
public void onCreate() {
super.onCreate();
initializeDispatcher();
mIsShutdown = false;
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
super.onStartCommand(intent, flags, startId);
if (mIsShutdown) {
Logger.get().info(TAG,
"Re-initializing SystemAlarmDispatcher after a request to shut-down.");
// Destroy the old dispatcher to complete it's lifecycle.
mDispatcher.onDestroy();
// Create a new dispatcher to setup a new lifecycle.
initializeDispatcher();
// Set mIsShutdown to false, to correctly accept new commands.
mIsShutdown = false;
}
if (intent != null) {
// Intent 添加到 mDispatcher
mDispatcher.add(intent, startId);
}
// If the service were to crash, we want all unacknowledged Intents to get redelivered.
return Service.START_REDELIVER_INTENT;
}
}
在 SystemAlarmDispatcher 中会处理相应的指令:
@MainThread
public boolean add(@NonNull final Intent intent, final int startId) {
...
intent.putExtra(KEY_START_ID, startId);
synchronized (mIntents) {
boolean hasCommands = !mIntents.isEmpty();
mIntents.add(intent);
if (!hasCommands) {
// Only call processCommand if this is the first command.
// The call to dequeueAndCheckForCompletion will process the remaining commands
// in the order that they were added.
processCommand();
}
}
return true;
}
processCommand() 会将处理任务再交给 CommandHandler:
@MainThread
@SuppressWarnings("FutureReturnValueIgnored")
private void processCommand() {
assertMainThread();
PowerManager.WakeLock processCommandLock =
WakeLocks.newWakeLock(mContext, PROCESS_COMMAND_TAG);
try {
processCommandLock.acquire();
// Process commands on the background thread.
mWorkManager.getWorkTaskExecutor().executeOnBackgroundThread(new Runnable() {
@Override
public void run() {
synchronized (mIntents) {
mCurrentIntent = mIntents.get(0);
}
if (mCurrentIntent != null) {
final String action = mCurrentIntent.getAction();
final int startId = mCurrentIntent.getIntExtra(KEY_START_ID,
DEFAULT_START_ID);
Logger.get().debug(TAG,
String.format("Processing command %s, %s", mCurrentIntent,
startId));
final PowerManager.WakeLock wakeLock = WakeLocks.newWakeLock(
mContext,
String.format("%s (%s)", action, startId));
try {
Logger.get().debug(TAG, String.format(
"Acquiring operation wake lock (%s) %s",
action,
wakeLock));
wakeLock.acquire();
// 专门处理所有 Intent
mCommandHandler.onHandleIntent(mCurrentIntent, startId,
SystemAlarmDispatcher.this);
} catch (Throwable throwable) {
Logger.get().error(
TAG,
"Unexpected error in onHandleIntent",
throwable);
} finally {
Logger.get().debug(
TAG,
String.format(
"Releasing operation wake lock (%s) %s",
action,
wakeLock));
wakeLock.release();
// Check if we have processed all commands
postOnMainThread(
new DequeueAndCheckForCompletion(SystemAlarmDispatcher.this));
}
}
}
});
} finally {
processCommandLock.release();
}
}
在 CommandHandler 中真正的对 Intent 的 action 做分类处理:
@WorkerThread
void onHandleIntent(
@NonNull Intent intent,
int startId,
@NonNull SystemAlarmDispatcher dispatcher) {
String action = intent.getAction();
if (ACTION_CONSTRAINTS_CHANGED.equals(action)) {
handleConstraintsChanged(intent, startId, dispatcher);
} else if (ACTION_RESCHEDULE.equals(action)) {
handleReschedule(intent, startId, dispatcher);
} else {
Bundle extras = intent.getExtras();
if (!hasKeys(extras, KEY_WORKSPEC_ID)) {
Logger.get().error(TAG,
String.format("Invalid request for %s, requires %s.",
action,
KEY_WORKSPEC_ID));
} else {
if (ACTION_SCHEDULE_WORK.equals(action)) {
handleScheduleWorkIntent(intent, startId, dispatcher);
} else if (ACTION_DELAY_MET.equals(action)) {
handleDelayMet(intent, startId, dispatcher);
} else if (ACTION_STOP_WORK.equals(action)) {
handleStopWork(intent, dispatcher);
} else if (ACTION_EXECUTION_COMPLETED.equals(action)) {
handleExecutionCompleted(intent, startId);
} else {
Logger.get().warning(TAG, String.format("Ignoring intent %s", intent));
}
}
}
}
其中第一个 if 判断中的 ACTION_CONSTRAINTS_CHANGED 正是前面传入的,它会进入 handleConstraintsChanged():
private void handleConstraintsChanged(
@NonNull Intent intent, int startId,
@NonNull SystemAlarmDispatcher dispatcher) {
// Constraints changed command handler is synchronous. No cleanup
// is necessary.
ConstraintsCommandHandler changedCommandHandler =
new ConstraintsCommandHandler(mContext, startId, dispatcher);
changedCommandHandler.handleConstraintsChanged();
}
ChangedCommandHandler 会把 Intent 原有的 Action ACTION_CONSTRAINTS_CHANGED 更换成 ACTION_DELAY_MET:
@WorkerThread
void handleConstraintsChanged() {
List<WorkSpec> candidates = mDispatcher.getWorkManager().getWorkDatabase()
.workSpecDao()
.getScheduledWork();
// Update constraint proxy to potentially disable proxies for previously
// completed WorkSpecs.
ConstraintProxy.updateAll(mContext, candidates);
// This needs to be done to populate matching WorkSpec ids in every constraint controller.
mWorkConstraintsTracker.replace(candidates);
List<WorkSpec> eligibleWorkSpecs = new ArrayList<>(candidates.size());
// Filter candidates should have already been scheduled.
long now = System.currentTimeMillis();
for (WorkSpec workSpec : candidates) {
String workSpecId = workSpec.id;
long triggerAt = workSpec.calculateNextRunTime();
if (now >= triggerAt && (!workSpec.hasConstraints()
|| mWorkConstraintsTracker.areAllConstraintsMet(workSpecId))) {
eligibleWorkSpecs.add(workSpec);
}
}
for (WorkSpec workSpec : eligibleWorkSpecs) {
String workSpecId = workSpec.id;
// 更换标记为 ACTION_DELAY_MET
Intent intent = CommandHandler.createDelayMetIntent(mContext, workSpecId);
mDispatcher.postOnMainThread(
new SystemAlarmDispatcher.AddRunnable(mDispatcher, intent, mStartId));
}
mWorkConstraintsTracker.reset();
}
CommandHandler 创建一个 Action 为 ACTION_DELAY_MET 的 Intent:
static Intent createDelayMetIntent(@NonNull Context context, @NonNull String workSpecId) {
Intent intent = new Intent(context, SystemAlarmService.class);
intent.setAction(ACTION_DELAY_MET);
intent.putExtra(KEY_WORKSPEC_ID, workSpecId);
return intent;
}
再回到 CommandHandler 的 onHandleIntent(),刚好有处理 ACTION_DELAY_MET 的情况,会调用 handleDelayMet():
private void handleDelayMet(
@NonNull Intent intent,
int startId,
@NonNull SystemAlarmDispatcher dispatcher) {
Bundle extras = intent.getExtras();
synchronized (mLock) {
String workSpecId = extras.getString(KEY_WORKSPEC_ID);
// Check to see if we are already handling an ACTION_DELAY_MET for the WorkSpec.
// If we are, then there is nothing for us to do.
if (!mPendingDelayMet.containsKey(workSpecId)) {
DelayMetCommandHandler delayMetCommandHandler =
new DelayMetCommandHandler(mContext, startId, workSpecId, dispatcher);
mPendingDelayMet.put(workSpecId, delayMetCommandHandler);
delayMetCommandHandler.handleProcessWork();
} else {
// log...
}
}
}
在 DelayMetCommandHandler 的 handleProcessWork() 中,会调用 Processor 的 startWork() 最终执行到我们重写的 Worker 的 doWork():
@WorkerThread
void handleProcessWork() {
...
if (!mHasConstraints) {
onAllConstraintsMet(Collections.singletonList(mWorkSpecId));
} else {
// Allow tracker to report constraint changes
mWorkConstraintsTracker.replace(Collections.singletonList(workSpec));
}
}
@Override
public void onAllConstraintsMet(@NonNull List<String> workSpecIds) {
// WorkConstraintsTracker will call onAllConstraintsMet with list of workSpecs whose
// constraints are met. Ensure the workSpecId we are interested is part of the list
// before we call Processor#startWork().
if (!workSpecIds.contains(mWorkSpecId)) {
return;
}
synchronized (mLock) {
if (mCurrentState == STATE_INITIAL) {
mCurrentState = STATE_START_REQUESTED;
// 调用 Processor 的 startWork()
// Constraints met, schedule execution
// Not using WorkManagerImpl#startWork() here because we need to know if the
// processor actually enqueued the work here.
boolean isEnqueued = mDispatcher.getProcessor().startWork(mWorkSpecId);
if (isEnqueued) {
// setup timers to enforce quotas on workers that have
// been enqueued
mDispatcher.getWorkTimer()
.startTimer(mWorkSpecId, WORK_PROCESSING_TIME_IN_MS, this);
} else {
// if we did not actually enqueue the work, it was enqueued before
// cleanUp and pretend this never happened.
cleanUp();
}
} else {
Logger.get().debug(TAG, String.format("Already started work for %s", mWorkSpecId));
}
}
}
参考资料:
使用 WorkManager 调度任务
WorkManager