windows C#-生成和使用异步流(下)
异步流可提供更好的方法
异步流和关联语言支持解决了所有这些问题。 生成序列的代码现在可以使用 yield return 返回用 async 修饰符声明的方法中的元素。 可以通过 await foreach 循环来使用异步流,就像通过 foreach 循环使用任何序列一样。
这些新语言功能依赖于添加到 .NET Standard 2.1 并在 .NET Core 3.0 中实现的三个新接口:
System.Collections.Generic.IAsyncEnumerable<T>
System.Collections.Generic.IAsyncEnumerator<T>
System.IAsyncDisposable
大多数 C# 开发人员都应该熟悉这三个接口。 它们的行为方式类似于其对应的同步对象:
System.Collections.Generic.IEnumerable<T>
System.Collections.Generic.IEnumerator<T>
System.IDisposable
可能不熟悉的一种类型是 System.Threading.Tasks.ValueTask。 ValueTask 结构提供了与 System.Threading.Tasks.Task 类类似的 API。 出于性能方面的原因,这些接口中使用了 ValueTask。
转换为异步流
接下来,转换 RunPagedQueryAsync 方法以生成异步流。 首先,更改 RunPagedQueryAsync 的签名以返回 IAsyncEnumerable<JToken>,并从参数列表删除取消令牌和进度对象,如以下代码所示:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
起始代码在检索页面时处理每个页面,如以下代码所示:
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
将这三行替换为以下代码:
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
还可以在此方法中删除前面的 finalResults
声明以及你修改的循环之后的 return
语句。
已完成更改以生成异步流。 已完成的方法应与以下代码类似:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
接下来,将使用集合的代码更改为使用异步流。 在 Main
中找到以下处理问题集合的代码:
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
将该代码替换为以下 await foreach
循环:
int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
新接口 IAsyncEnumerator<T> 派生自 IAsyncDisposable。 这意味着在循环完成时,前面的循环会以异步方式释放流。 可以假设循环类似于以下代码:
int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
var issue = enumerator.Current;
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
} finally
{
if (enumerator != null)
await enumerator.DisposeAsync();
}
默认情况下,在捕获的上下文中处理流元素。 如果要禁用上下文捕获,请使用 TaskAsyncEnumerableExtensions.ConfigureAwait 扩展方法。
异步流支持使用与其他 async 方法相同的协议的取消。 要支持取消,请按如下所示修改异步迭代器方法的签名:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
System.Runtime.CompilerServices.EnumeratorCancellationAttribute 属性导致编译器生成 IAsyncEnumerator<T> 的代码,该代码使传递给 GetAsyncEnumerator 的令牌对作为该参数的异步迭代器的主体可见。 在 runQueryAsync 中,可以检查令牌的状态,并在请求时取消进一步的工作。
使用另一个扩展方法 WithCancellation,将取消标记传递给异步流。 可以按如下所示修改枚举问题的循环:
private static async Task EnumerateWithCancellation(GitHubClient client)
{
int num = 0;
var cancellation = new CancellationTokenSource();
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
.WithCancellation(cancellation.Token))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
}
可以从 asynchronous-programming/snippets 文件夹中的 dotnet/docs 存储库中获取已完成的文的代码。
运行完成的应用程序
再次运行该应用程序。 将其行为与初学者应用程序的行为进行对比。 会在结果的第一页可用立即对其进行枚举。 在请求和检索每个新页面时都会有一个可观察到的暂停,然后快速枚举下一页结果。 不需要 try / catch 块来处理取消:调用者可以停止枚举集合。 由于异步流在下载每个页面时生成结果,因此可以清楚地报告进度。 返回的每个问题的状态都无缝包含在 await foreach 循环中。 不需要回调对象即可跟踪进度。
通过检查代码,可以看到内存使用方面的改进。 不再需要在枚举所有结果之前分配一个集合来存储它们。 调用者可以决定如何使用结果,以及是否需要存储集合。
运行初学者应用程序和已完成的应用程序,可以自行观察实现之间的差异。 可以在完成本文后删除在开始学习本文时创建的 GitHub 访问令牌。 如果攻击者获得了对该令牌的访问权限,他们可以使用你的凭据来访问 GitHub API。
在本文中,你使用异步流从返回数据页的网络 API 读取单个项。 异步流还可以从股票行情自动收录器或传感器设备等“永不结束的流”读取内容。 对 MoveNextAsync 的调用将在下一项可用后立即返回它。