Java通过calcite实时读取kafka中的数据
引入maven依赖
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-kafka</artifactId>
<version>1.28.0</version>
</dependency>
测试代码
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Properties;
public class CalciteDemo {
public static void main(String[] args) throws SQLException {
String model = "inline:" +
"{\n" +
" \"version\": \"1.0\",\n" +
" \"defaultSchema\": \"KAFKA\",\n" +
" \"schemas\": [\n" +
" {\n" +
" \"name\": \"KAFKA\",\n" +
" \"tables\": [\n" +
" {\n" +
" \"name\": \"TEST_TABLE\",\n" +
" \"factory\": \"org.apache.calcite.adapter.kafka.KafkaTableFactory\",\n" +
" \"stream\": { \"stream\": true },\n" +
" \"operand\": {\n" +
" \"bootstrap.servers\": \"192.168.x.xx:9092\",\n" +
" \"topic.name\": \"my-cloud-events\",\n" +
" \"consumer.params\": {\n" +
" \"group.id\": \"calcite-ut-consumer\",\n" +
" \"key.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\",\n" +
" \"value.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\"\n" +
" }\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}";
Properties info = new Properties();
info.put("model", model);
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
final String sql7 = "SELECT STREAM * FROM \"KAFKA\".\"TEST_TABLE\"";
print(calciteConnection,sql7);
connection.close();
calciteConnection.close();
}
public static void print(CalciteConnection calciteConnection, String sql7) throws SQLException {
final PreparedStatement statement = calciteConnection.prepareStatement(sql7);
final ResultSet resultSet = statement.executeQuery();
ResultSetMetaData metadata = resultSet.getMetaData();
while (resultSet.next()) {
for (int i = 1; i <= metadata.getColumnCount(); i++) {
System.out.print(metadata.getColumnLabel(i) + "=" + resultSet.getString(i) + ",");
}
System.out.println();
}
}
}