当前位置: 首页 > article >正文

Doris的Routine Load方式消费Kafka数据进入Doris

假设kafka已有嵌套JSON数据格式为

{
    "appId": "10000",
    "platform": "YY",
    "userId": "007",
    "userAgent": "6",
    "event": "login",
    "package": "org.apache.doris",
    "properties": {
        "phoneNumber": "13814516235",
        "actionTime": "1694928000",
        "deviceID": "device123",
        "deviceType": "smartphone",
        "appVersion": "1.0.0",
        "networkType": "WiFi",
        "os": "Android",
        "userUID": "user123",
        "nickname": "小王",
        "clientIp": "192.168.1.1"
    },
    "clientIp": "10.225.36.85",
    "timestamp": "1694928000",
    "source": "mobileApp",
    "sessionId": "12554122524422"
}

1、创建建表语句

CREATE TABLE test_app_dwh.rt_ods_log_app_loginout (
    appId VARCHAR(20) NOT NULL COMMENT "应用ID",
    userId VARCHAR(20) NOT NULL COMMENT "用户ID",
    timestamp BIGINT COMMENT "时间戳",
    platform VARCHAR(20) NOT NULL COMMENT "平台",
    userAgent VARCHAR(20) NOT NULL COMMENT "用户代理",
    event VARCHAR(20) NOT NULL COMMENT "事件类型",
    package VARCHAR(100) NOT NULL COMMENT "包名",
    phoneNumber VARCHAR(20) COMMENT "电话号码",
    actionTime BIGINT COMMENT "动作时间戳",
    deviceID VARCHAR(50) COMMENT "设备ID",
    deviceType VARCHAR(20) COMMENT "设备类型",
    appVersion VARCHAR(20) COMMENT "应用版本",
    networkType VARCHAR(20) COMMENT "网络类型",
    os VARCHAR(20) COMMENT "操作系统",
    userUID VARCHAR(50) COMMENT "用户唯一标识",
    nickname VARCHAR(50) COMMENT "昵称",
    clientIp VARCHAR(20) COMMENT "客户端IP",
    source VARCHAR(20) COMMENT "来源",
    sessionId VARCHAR(50) COMMENT "会话ID"
)
DUPLICATE KEY(appId, userId, timestamp)
DISTRIBUTED BY HASH(appId) BUCKETS 1;

2、导入命令

CREATE ROUTINE LOAD test_game_dwh.kafkajob_rt_ods_log_app_loginout ON rt_ods_log_app_loginout
COLUMNS(appId, userId, timestamp, platform, userAgent, event, package, phoneNumber, actionTime, deviceID, deviceType, appVersion, networkType, os, userUID, nickname, clientIp, source, sessionId)
PROPERTIES
(
    "desired_concurrent_number" = "1",
    "format" = "json",
    "strict_mode" = "false",
    "jsonpaths" = "[\"$.appId\",\"$.userId\",\"$.timestamp\",\"$.platform\",\"$.userAgent\",\"$.event\",\"$.package\",\"$.properties.phoneNumber\",\"$.properties.actionTime\",\"$.properties.deviceID\",\"$.properties.deviceType\",\"$.properties.appVersion\",\"$.properties.networkType\",\"$.properties.os\",\"$.properties.userUID\",\"$.properties.nickname\",\"$.properties.clientIp\",\"$.source\",\"$.sessionId\"]"
)
FROM KAFKA
(
    "kafka_broker_list" = "ip1:9092,ip2:9092,ip3:9092",
    "kafka_topic" = "loginout",
    "property.group.id" = "kafka_job",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

最后kafka的数据就可以源源不断的存储到doris表里面了


http://www.kler.cn/a/319037.html

相关文章:

  • git没有识别出大写字母改成小写重命名的文件目录
  • 2023年MathorCup数学建模B题城市轨道交通列车时刻表优化问题解题全过程文档加程序
  • Docker 篇-Docker 详细安装、了解和使用 Docker 核心功能(数据卷、自定义镜像 Dockerfile、网络)
  • Django基础用法+Demo演示
  • 【121. 买卖股票的最佳时机】——贪心算法/动态规划
  • 解决Anaconda出现CondaHTTPError: HTTP 000 CONNECTION FAILED for url
  • Linux 一键部署Mysql 5.7.44
  • Spring源码-ConfigurationClassPostProcessor类解析spring相关注解
  • 从事新闻、出版、教育、药品和医疗器械、文化、广播电影电视节目等互联网信息服务小程序备案说明
  • AI 文生图快速入门教程:让 Stable Diffusion 更易于上手
  • vue基础面试题
  • 简单水印通过python去除
  • 数造科技荣获“2024爱分析·数据智能优秀厂商”
  • 传输大咖45 | 跨国传输大文件的高效、安全传输系统
  • 大数据Flink(一百二十一):Flink CDC基本介绍
  • 数据在内存中的存储(上)
  • Spring Boot 学习和使用
  • 大数据新视界 --大数据大厂之探索ES:大数据时代的高效搜索引擎实战攻略
  • 基于SpringBoot的CSGO赛事管理系统
  • 自动化等保测评:提升效率,降低成本的新路径
  • 科研绘图系列:R语言箱线图和连线图(boxplot linechart)
  • Vue.js 与 Flask/Django 后端配合:构建现代 Web 应用的最佳实践
  • uniapp实现触底分页加载
  • 微服务面试-修改nacos配置,不重启服务怎生效
  • Zerotier 内网穿透教程
  • TomCat乱码问题