当前位置: 首页 > article >正文

C# .Net Core通过StreamLoad向Doris写入CSV数据

以下代码可以只关注StreamLoad具体实现。

1.创建StreamLoad返回值Model

public class StreamLoadResponse
{
	public long TxnId { get; set; }
	public string Label { get; set; }
	public string Comment { get; set; }
	public string TwoPhaseCommit { get; set; }
	public string Status { get; set; }
	public string Message { get; set; }
	public long NumberTotalRows { get; set; }
	public long NumberLoadedRows { get; set; }
	public long NumberFilteredRows { get; set; }
	public long NumberUnselectedRows { get; set; }
	public long LoadBytes { get; set; }
	public long LoadTimeMs { get; set; }
	public long BeginTxnTimeMs { get; set; }
	public long StreamLoadPutTimeMs { get; set; }
	public long ReadDataTimeMs { get; set; }
	public long WriteDataTimeMs { get; set; }
	public long CommitAndPublishTimeMs { get; set; }
}

2.创建Doris StreamLoad接口

public interface IDorisApiService
{
	/// <summary>
	/// 
	/// </summary>
	/// <param name="database">数据库</param>
	/// <param name="table">表</param>
	/// <param name="authorization">认证信息,格式 username:pwd</param>
	/// <param name="content">csv格式的字符串</param>
	/// <returns></returns>
	StreamLoadResponse StreamLoad(string database, string table, string authorization, string content);
}

3.实现接口,核心代码,逻辑并不复杂,组装一个http请求所需的内容。

需要注意的是:(1)示例csv格式的字符串分割符为‘\t’,而不是常用的逗号,这也是官方默认的分割方式,如果你想用其他的分隔符,需要在header里配置column_separator。建议不要用逗号,因为涉及到复杂的json字符串的时候,里面的逗号会导致解析异常,即便官方文档里有相关的处理方式(enclose),似乎仍然存在问题。(2)我们请求了两次,第一次请求会重定向到BE节点的地址,然后用此地址再次请求。这是正常的。(3)我们采用的format是csv_with_names,第一行是列名,请确保跟数据库table列顺序和数量保持一致(4)注意认证信息格式为username:password,数据库用户

public class DorisApiService : IDorisApiService
{
    private readonly HttpClient _httpClient;

    public DorisApiService(HttpClient httpClient)
    {
        _httpClient = httpClient;
    }
    public StreamLoadResponse StreamLoad(string database, string table, string authorization, string content)
    {
        var url = $"/api/{database}/{table}/_stream_load";
        var request = new HttpRequestMessage(HttpMethod.Put, url);

        request.Headers.Add("Expect", "100-continue");
        request.Headers.Add("format", "csv_with_names");
        request.Headers.Add("column_separator", "\t");
        request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", authorization);

        var response = _httpClient.Send(request, HttpCompletionOption.ResponseHeadersRead);
        if (response.StatusCode == HttpStatusCode.TemporaryRedirect || response.StatusCode == HttpStatusCode.RedirectKeepVerb)
        {
            var redirectUrl = response.Headers.Location.ToString();
            request = new HttpRequestMessage(HttpMethod.Put, redirectUrl)
            {
                Content = new StringContent(content, Encoding.UTF8, "text/plain")
            };

            request.Headers.Add("Expect", "100-continue");
            request.Headers.Add("format", "csv_with_names");
            request.Headers.Add("column_separator", "\t");
            request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", authorization);

            response = _httpClient.Send(request, HttpCompletionOption.ResponseHeadersRead);
        }

        string responseBody = response.Content.ReadAsStringAsync().Result;           
        if (response.IsSuccessStatusCode)
        {
            var streamLoadResponse = JsonSerializer.Deserialize<StreamLoadResponse>(responseBody);
            if (streamLoadResponse.Status == "Success")
            {
                return streamLoadResponse;
            }
            else
            {
                throw new Exception(responseBody);
            }
        }
        else
        {
            throw new Exception(responseBody);
        }
    }
}

4.Program配置

services.AddHttpClient<IDorisApiService, DorisApiService>(client =>
{
    //从配置文件获取Doris的请求地址和端口:settings.ApiHost
	client.BaseAddress = new Uri(settings.ApiHost);
	client.Timeout = TimeSpan.FromSeconds(300);
}).ConfigurePrimaryHttpMessageHandler(() => new HttpClientHandler
{
	AllowAutoRedirect = false
});


http://www.kler.cn/a/410413.html

相关文章:

  • Easyexcel(6-单元格合并)
  • ARM CCA机密计算安全模型之概述
  • LLC与反激电路设计【学习笔记】
  • 网络原理(一):应用层自定义协议的信息组织格式 初始 HTTP
  • Python酷库之旅-第三方库Pandas(237)
  • Leetcode207. 课程表(HOT100)
  • C# 创建快捷方式文件和硬链接文件
  • 大语言模型---通过数值梯度的方式计算损失值L对模型权重矩阵W的梯度;数值梯度的公式;数值梯度计算过程
  • macOS上进行Ant Design Pro实战教程(一)
  • 【51单片机】程序实验56.独立按键-矩阵按键
  • 【初阶数据与算法】线性表之顺序表的定义与实现
  • 跨平台开发_RTC程序设计:实时音视频权威指南 2
  • Web day02 Js Vue Ajax
  • Java的字符串操作(二)(代码示例)
  • spring的事务隔离?
  • IEC61850读服务器目录命令——GetServerDirectory介绍
  • Gitlab有趣而实用的功能
  • Ajax学习笔记,第一节:语法基础
  • 电影风格城市夜景旅拍Lr调色教程,手机滤镜PS+Lightroom预设下载!
  • 杂项驱动开发
  • 【JavaEE】Servlet:表白墙
  • CSS 样式入门:属性全知晓
  • Leetcode 组合
  • STM32WB55RG开发(5)----监测STM32WB连接状态
  • C#里怎么样访问文件时间
  • 《Shader入门精要》透明效果