当前位置: 首页 > article >正文

windows C#-生成和使用异步流(下)

异步流可提供更好的方法

异步流和关联语言支持解决了所有这些问题。 生成序列的代码现在可以使用 yield return 返回用 async 修饰符声明的方法中的元素。 可以通过 await foreach 循环来使用异步流,就像通过 foreach 循环使用任何序列一样。

这些新语言功能依赖于添加到 .NET Standard 2.1 并在 .NET Core 3.0 中实现的三个新接口:

System.Collections.Generic.IAsyncEnumerable<T>
System.Collections.Generic.IAsyncEnumerator<T>
System.IAsyncDisposable

大多数 C# 开发人员都应该熟悉这三个接口。 它们的行为方式类似于其对应的同步对象:

System.Collections.Generic.IEnumerable<T>
System.Collections.Generic.IEnumerator<T>
System.IDisposable

可能不熟悉的一种类型是 System.Threading.Tasks.ValueTask。 ValueTask 结构提供了与 System.Threading.Tasks.Task 类类似的 API。 出于性能方面的原因,这些接口中使用了 ValueTask。

转换为异步流

接下来,转换 RunPagedQueryAsync 方法以生成异步流。 首先,更改 RunPagedQueryAsync 的签名以返回 IAsyncEnumerable<JToken>,并从参数列表删除取消令牌和进度对象,如以下代码所示:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

起始代码在检索页面时处理每个页面,如以下代码所示:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

将这三行替换为以下代码:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

还可以在此方法中删除前面的 finalResults 声明以及你修改的循环之后的 return 语句。

已完成更改以生成异步流。 已完成的方法应与以下代码类似:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

接下来,将使用集合的代码更改为使用异步流。 在 Main 中找到以下处理问题集合的代码:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

将该代码替换为以下 await foreach 循环: 

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

新接口 IAsyncEnumerator<T> 派生自 IAsyncDisposable。 这意味着在循环完成时,前面的循环会以异步方式释放流。 可以假设循环类似于以下代码: 

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

默认情况下,在捕获的上下文中处理流元素。 如果要禁用上下文捕获,请使用 TaskAsyncEnumerableExtensions.ConfigureAwait 扩展方法。 

异步流支持使用与其他 async 方法相同的协议的取消。 要支持取消,请按如下所示修改异步迭代器方法的签名:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

 System.Runtime.CompilerServices.EnumeratorCancellationAttribute 属性导致编译器生成 IAsyncEnumerator<T> 的代码,该代码使传递给 GetAsyncEnumerator 的令牌对作为该参数的异步迭代器的主体可见。 在 runQueryAsync 中,可以检查令牌的状态,并在请求时取消进一步的工作。

使用另一个扩展方法 WithCancellation,将取消标记传递给异步流。 可以按如下所示修改枚举问题的循环:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

可以从 asynchronous-programming/snippets 文件夹中的 dotnet/docs 存储库中获取已完成的文的代码。

运行完成的应用程序

再次运行该应用程序。 将其行为与初学者应用程序的行为进行对比。 会在结果的第一页可用立即对其进行枚举。 在请求和检索每个新页面时都会有一个可观察到的暂停,然后快速枚举下一页结果。 不需要 try / catch 块来处理取消:调用者可以停止枚举集合。 由于异步流在下载每个页面时生成结果,因此可以清楚地报告进度。 返回的每个问题的状态都无缝包含在 await foreach 循环中。 不需要回调对象即可跟踪进度。

通过检查代码,可以看到内存使用方面的改进。 不再需要在枚举所有结果之前分配一个集合来存储它们。 调用者可以决定如何使用结果,以及是否需要存储集合。

运行初学者应用程序和已完成的应用程序,可以自行观察实现之间的差异。 可以在完成本文后删除在开始学习本文时创建的 GitHub 访问令牌。 如果攻击者获得了对该令牌的访问权限,他们可以使用你的凭据来访问 GitHub API。

在本文中,你使用异步流从返回数据页的网络 API 读取单个项。 异步流还可以从股票行情自动收录器或传感器设备等“永不结束的流”读取内容。 对 MoveNextAsync 的调用将在下一项可用后立即返回它。


http://www.kler.cn/a/409950.html

相关文章:

  • 滑动窗口篇——如行云流水般的高效解法与智能之道(2)
  • Burp入门(1)
  • Vue 3 动态组件教程
  • 《文件操作》
  • [RabbitMQ] 重试机制+TTL+死信队列
  • 【tensorflow的安装步骤】
  • 具有多个表盘、心率传感器、指南针和游戏的 DIY 智能手表
  • 2024年跨行业跨领域工业互联网平台
  • 17.5k Star,ThingsBoard 一款开源、免费、功能全面的物联网 IoT 平台 -慧知开源充电桩平台
  • Linux中的共享内存
  • 【CSP CCF记录】201803-1第13次认证 跳一跳
  • matlab -炉温串级控制PID
  • 24.11.21深度学习
  • .NET Core发布网站报错 HTTP Error 500.31
  • 视频分析设备平台EasyCVR视频设备轨迹回放平台与应急布控球的视频监控方案
  • 嵌入式硬件杂谈(六)充电器原理 线性电源 开关电源 反激电源原理
  • 论文阅读:A fast, scalable and versatile tool for analysis of single-cell omics data
  • node.js nvm 安装和使用
  • 前端面试笔试(五)
  • 网络安全等级保护测评机构管理办法(全文)
  • 【前端学习笔记】Web API——BOM与DOM
  • Python 版本的 2024详细代码
  • AI安全:从现实关切到未来展望
  • Jmeter中的监听器
  • 信息收集(1)
  • WPF中的Button按钮中的PreviewMouseLeftButtonDown事件和MouseLeftButtonDown的区别