当前位置: 首页 > article >正文

Java通过calcite实时读取kafka中的数据

引入maven依赖

        <dependency>

            <groupId>org.apache.calcite</groupId>

            <artifactId>calcite-kafka</artifactId>

            <version>1.28.0</version>

        </dependency>

测试代码

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.ResultSetMetaData;

import java.sql.SQLException;

import java.util.Properties;

public class CalciteDemo {

    public static void main(String[] args) throws SQLException {

        String model = "inline:" +

                "{\n" +

                "  \"version\": \"1.0\",\n" +

                "  \"defaultSchema\": \"KAFKA\",\n" +

                "  \"schemas\": [\n" +

                "    {\n" +

                "    \"name\": \"KAFKA\",\n" +

                "    \"tables\": [\n" +

                "      {\n" +

                "        \"name\": \"TEST_TABLE\",\n" +

                "        \"factory\": \"org.apache.calcite.adapter.kafka.KafkaTableFactory\",\n" +

                "        \"stream\": { \"stream\": true },\n" +

                "        \"operand\": {\n" +

                "          \"bootstrap.servers\": \"192.168.x.xx:9092\",\n" +

                "          \"topic.name\": \"my-cloud-events\",\n" +

                "          \"consumer.params\": {\n" +

                "            \"group.id\": \"calcite-ut-consumer\",\n" +

                "            \"key.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\",\n" +

                "            \"value.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\"\n" +

                "          }\n" +

                "        }\n" +

                "      }\n" +

                "    ]\n" +

                "    }\n" +

                "  ]\n" +

                "}";

        Properties info = new Properties();

        info.put("model", model);

        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);

        final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        final String sql7 = "SELECT STREAM * FROM \"KAFKA\".\"TEST_TABLE\"";

        print(calciteConnection,sql7);

        connection.close();

        calciteConnection.close();

    }

    public static void print(CalciteConnection calciteConnection, String sql7) throws SQLException {

        final PreparedStatement statement = calciteConnection.prepareStatement(sql7);

        final ResultSet resultSet = statement.executeQuery();

        ResultSetMetaData metadata = resultSet.getMetaData();

        while (resultSet.next()) {

            for (int i = 1; i <= metadata.getColumnCount(); i++) {

                System.out.print(metadata.getColumnLabel(i) + "=" + resultSet.getString(i) + ",");

            }

            System.out.println();

        }

    }

}

发送测试数据

运行结果


http://www.kler.cn/a/405149.html

相关文章:

  • 设计模式之创建模式篇
  • HarmonyOs鸿蒙开发实战(17)=>沉浸式效果第二种方案一组件安全区方案
  • 【CVE-2024-9413】SCP-Firmware漏洞:安全通告
  • js批量输入地址获取经纬度
  • Java基础夯实——2.6 Java中的锁
  • IDEA2023设置控制台日志输出到本地文件
  • 学习threejs,通过SkinnedMesh来创建骨骼和蒙皮动画
  • WSL2 ubuntu配置redis
  • Simulink学习笔记【PID UG联动仿真】
  • 算法.图论-习题全集(Updating)
  • 【Android、IOS、Flutter、鸿蒙、ReactNative 】自定义View
  • 力扣 LeetCode 513. 找树左下角的值(Day8:二叉树)
  • [服务器] 腾讯云服务器免费体验,成功部署网站
  • PBDL (基于物理的深度学习)-Chapter 1
  • 深度学习day2-Tensor 2
  • 【Git】git从暂存区中移除文件
  • 山泽HDMI切换器:提升家庭娱乐与办公体验的利器
  • 支持向量机SVM——基于分类问题的监督学习算法
  • HBase 原理
  • 蓝桥杯每日真题 - 第19天
  • 第27天 安全开发-PHP应用TP 框架路由访问对象操作内置过滤绕过核心漏洞
  • 生产环境centos8 Red Hat8部署ansible and 一键部署mysql两主两从ansible脚本预告
  • 经验笔记:远端仓库和本地仓库之间的连接(以Gitee为例)
  • 在Sui 区块链上创建、部署和管理 NFT 的完整教程
  • java 设计模式 模板方法模式
  • shell脚本-笔记25