当前位置: 首页 > article >正文

执行flink sql连接clickhouse库

手把手教学,flink connector打通clickhouse大数据库,通过下发flink sql,来使用ck。

组件版本
jdk1.8
flink1.17.2
clickhouse23.12.2.59

1.背景

flink官方不支持clickhouse连接器,工作中难免会用到。

2.方案

利用GitHub大佬提供的源代码,我用的是release-1.16:https://github.com/itinycheng/flink-connector-clickhouse/tree/release-1.16

3.编译

导入IDEA,maven编译即可,生成flink-connector-clickhouse-1.16.0-SNAPSHOT.jar

4.将此依赖包,导入flink工程

spring boot工程

4.1)pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
<!--	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.13</version>
		<relativePath/> &lt;!&ndash; lookup parent from repository &ndash;&gt;
	</parent>-->

	<parent>
		<groupId>com.mit.microgrid</groupId>
		<artifactId>mit-microgrid</artifactId>
		<version>${project.build.version}</version>
	</parent>

	<artifactId>mit-microgrid-flink</artifactId>
	<name>mit-microgrid-flink</name>
	<description>flink connector clickhouse</description>

	<properties>
		<java.version>1.8</java.version>
		<flink.version>1.17.2</flink.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
			<!-- 排除SpringBoot自带的日志依赖 -->
			<exclusions>
				<exclusion>
					<groupId>org.springframework.boot</groupId>
					<artifactId>spring-boot-starter-logging</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
		</dependency>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
		</dependency>

		<!--flink-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<!--flink connector-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-base</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!--flink connector clickhouse-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-clickhouse</artifactId>
			<version>1.16.0-SNAPSHOT</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
<!--			<artifactId>flink-clients_2.12</artifactId>-->
			<version>${flink.version}</version>
<!--			<scope>provided</scope>-->
		</dependency>
		<!-- flink sql -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner-loader</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!-- Flink JDBC Connector -->
<!--		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_2.12</artifactId>
			<version>1.14.6</version> &lt;!&ndash; 与您的Flink版本匹配 &ndash;&gt;
		</dependency>-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc</artifactId>
			<version>3.1.2-1.17</version>
		</dependency>
		<!-- ClickHouse JDBC Driver -->
		<dependency>
			<groupId>ru.yandex.clickhouse</groupId>
			<artifactId>clickhouse-jdbc</artifactId>
			<version>0.3.2</version> <!-- 请根据实际情况选择最新稳定版本 -->
		</dependency>
		<!-- 添加clickhouse-maven依赖-->
		<dependency>
			<groupId>ru.ivi.opensource</groupId>
			<artifactId>flink-clickhouse-sink</artifactId>
			<version>1.2.0</version>
		</dependency>

		<!--module-->
		<dependency>
			<groupId>com.mit.microgrid</groupId>
			<artifactId>mit-microgrid-common-core</artifactId>
			<version>${project.build.version}</version>
		</dependency>
		<dependency>
			<groupId>com.mit.microgrid</groupId>
			<artifactId>mit-microgrid-api-history</artifactId>
			<version>${project.build.version}</version>
		</dependency>

		<!--sql parse-->
		<dependency>
			<groupId>org.apache.calcite</groupId>
			<artifactId>calcite-core</artifactId>
			<version>1.37.0</version>
		</dependency>
<!--		<dependency>
			<groupId>org.apache.calcite</groupId>
			<artifactId>calcite-server</artifactId>
			<version>1.37.0</version>
		</dependency>-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-parser</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!--mysql-->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.30</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-uber</artifactId>
			<version>1.17.2</version>
		</dependency>

		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-text</artifactId>
			<version>1.12.0</version>
		</dependency>
	</dependencies>

	<build>
		<!--<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
				<executions>
					<execution>
						<goals>
							<goal>repackage</goal>
						</goals>
					</execution>
				</executions>
			</plugin>-->
<!--			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.2.4</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<artifactSet>
								<excludes>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers combine.children="append">
								<transformer
										implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>-->
<!--		</plugins>-->

		<finalName>${project.artifactId}</finalName>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<version>2.7.3</version>
				<configuration>
					<mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass>
					<fork>true</fork>
					<layout>ZIP</layout>
					<includeSystemScope>true</includeSystemScope>
				</configuration>
				<executions>
					<execution>
						<goals>
							<goal>repackage</goal>
						</goals>
						<configuration>
							<classifier>-with-dependencies</classifier>
						</configuration>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<version>3.3.0</version>
				<configuration>
					<archive>
						<addMavenDescriptor>false</addMavenDescriptor>
						<manifest>
							<mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass>
							<addClasspath>true</addClasspath>
							<classpathPrefix>lib/</classpathPrefix>
						</manifest>
					</archive>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>3.3.0</version>
				<configuration>
					<descriptors>
						<descriptor>src/main/resources/assembly/assembly.xml</descriptor>
					</descriptors>
					<outputDirectory>./../out</outputDirectory>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>

</project>

4.2)核心方法:

    /**
     * multiple sql execute
     *
     * @param sqlList
     */
    public static JobClient flinkSqlJobClientMultiple(List<String> sqlList) {
        log.info("参数sqlList: {}", sqlList);
//        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
        EnvironmentSettings setting = EnvironmentSettings.newInstance()
                .inBatchMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(setting);
        if (CollectionUtil.isNullOrEmpty(sqlList)) {
            log.warn("sqlList参数为空");
            return null;
        }
        for (String s : sqlList) {
            TableResult tableResult = tEnv.executeSql(s);
            Optional<JobClient> jobClientOptional = tableResult.getJobClient();
            if (jobClientOptional.isPresent()) {
                JobClient jobClient = jobClientOptional.get();
                log.info("jobClient: " + jobClient);
                return jobClient;
            }
        }
        log.error("没有可执行的job");
        return null;
    }

5.源码地址

https://github.com/genghongsheng0/mit-microgrid-flink


http://www.kler.cn/a/399243.html

相关文章:

  • nfs服务器--RHCE
  • 深入理解 C++ 二叉树
  • SpringBoot配置相关的内容
  • 传奇996_23——杀怪掉落,自动捡取,捡取动画
  • c# 调用c++ 的dll 出现找不到函数入口点
  • MySQL缓存使用率超过80%的解决方法
  • 《线性代数》学习笔记
  • 一个功能强大的文档解析和转换工具,支持PDF、DOCX、PPTX和Markdown等
  • 常用命令之LinuxOracleHivePython
  • 矩阵转置 Matlab与Numpy差异,复数慎重
  • 基于Java Springboot宠物流浪救助系统
  • Android中Crash Debug技巧
  • 单体架构 IM 系统之 Server 节点状态化分析
  • 【Rust中的策略模式实现】
  • 10款PDF合并工具的使用体验与推荐!!!
  • 【Redis】使用redis实现登录校验功能
  • vim配置 --> 在创建的普通用户下
  • linux,一、部署LNMP环境二、配置动静分离三、地址重写四、编写systemd Unit文件
  • Azure pipeline 通过git命令修改文件
  • 记录配置ubuntu18.04下运行ORBSLAM3的ros接口的过程及执行单目imu模式遇到的问题(详细说明防止忘记)
  • 【Python刷题】最少拐弯路线问题
  • 实战:深入探讨 MySQL 和 SQL Server 全文索引的使用及其弊端
  • 从北美火到中国,大数据洞察品牌“STANLEY”的突围之路
  • Java基础-I/O流
  • 了解什么是Python(简介)
  • ES6笔记