当前位置: 首页 > article >正文

impala与kudu进行集成

文章目录

    • 概要
    • Kudu与Impala整合配置
    • Impala内部表
    • Impala外部表
    • Impala sql操作kudu
    • Impala jdbc操作表
      • 如果使用了Hadoop 使用了Kerberos认证,可使用如下方式进行连接。

概要

  • Impala是一个开源的高效率的SQL查询引擎,用于查询存储在Hadoop分布式文件系统(HDFS)中的大规模数据集。它提供了一个类似于传统关系型数据库的SQL接口,允许用户使用SQL语言查询存储在Hadoop集群中的数据。使用内存进行计算提供实时的SQL查询,impala强依赖于Hive 的MetaStore,直接使用hive的元数据,意味着impala元数据都存储在hive的MetaStore当中,并且impala兼容hive的绝大多数sql语法,具有实时,批处理,多并发等优点。
  • Kudu提供了KuduClient api用于操作kudu数据库,但不支持标准SQL操作,可以将Kudu与Apache Impala紧密集成,impala天然就支持兼容kudu,允许开发人员使用Impala的SQL语法从Kudu的tablets 插入,查询,更新和删除数据,Kudu与Impala整合本质上就是为了可以使用Hive表来操作Kudu,主要支持SQL操作。

Kudu与Impala整合配置

先安装Impala后安装Kudu,Impala默认与Kudu没有形成依赖,这里需要首先在Impala中开启Kudu依赖支持,打开Impala->“配置”->“Kudu服务”:
在这里插入图片描述
以上配置完成之后,重启Impala即可。

Impala内部表

内部表是由Impala自身管理的表,数据存储在Hive元数据库和Kudu中。当删除内部表时,存储在Hive元数据库中的元数据和存储在kudu中的数据都会被删除。
例如:
CREATE TABLE my_table1
(
id BIGINT,
name STRING,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES(
‘kudu.master_addresses’ = ‘cm1:7051’,
‘kudu.table_name’ = ‘my_table1’
);

Impala外部表

外部表则是由KUDU管理的表,元数据存储在Hive元数据库中,但实际数据文件存储在kudu中。删除外部表时,只会删除元数据,实际的数据文件不会被删除。外部表也可以指定数据的存储位置,可以在建表时指定,也可以通过ALTER TABLE语句修改。
使用Kudu client api 在Kudu中创建表test_user,创建好之后。使用下面的sql语句创建外部表。
CREATE EXTERNAL TABLE test_user STORED AS KUDU
TBLPROPERTIES(
‘kudu.table_name’ = ‘test_user’,
‘kudu.master_addresses’ = ‘10.68.18.60:7051’);

Impala sql操作kudu

插入数据
insert into default_vals(id,name,address,age) values (10,“hello1”,‘山东’,22) ;
查询表数据
select * from default_vals;
更新表数据
upsert into default_vals(id,name,address,age) values(102,‘hello2’,‘山东’,22);
删除数据
delete from default_valswhere id = 20;

Impala jdbc操作表

maven 依赖

        <!-- impala的驱动 -->
        <dependency>
            <groupId>com.cloudera.impala.jdbc</groupId>
            <artifactId>ImpalaJDBC42</artifactId>
            <version>2.5.42</version>
            <scope>provided</scope>
        </dependency>

代码示例

package com.example.demo.impala;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class ImpalaCrud {

	public static void main(String[] args) {
		System.out.println("begin");
		Connection conn =getConnection();
		queryTable(conn) ;
//		insertTable2(conn) ;
	}

	public static void insertTable2(Connection conn) {
		String insertSql="insert into default_vals( name,age,create_time,update_time,id) values (?,?,?,now(),?)";
		PreparedStatement ps=null;
		try {
			ps=conn.prepareStatement(insertSql);
			ps.setString(1, "张三李四");
			ps.setString(2, "43");
			ps.setTimestamp(3, getCurrentTimestamp());
			ps.setString(4, "102");
			ps.execute();
		} catch (SQLException e) {
			e.printStackTrace();
		}finally {
			if(conn!=null) {
				try {
					conn.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
		}
	}
	public static java.sql.Timestamp  getCurrentTimestamp() {
		java.util.Date date=new java.util.Date();
		java.sql.Timestamp timestamp = new java.sql.Timestamp(date.getTime()); 
		System.out.println(timestamp);
//			java.sql.Date sqlDate=new java.sql.Date(date.getTime());
		return timestamp;
	}
	
	public static void insertTable(Connection conn) {
		String insertSql="insert into default_vals( name,age,create_time,update_time,id) values (?,?,now(),now(),?)";
		PreparedStatement ps=null;
		try {
			ps=conn.prepareStatement(insertSql);
			
			ps.setString(1, "xxxxx1");
			ps.setInt(2, 43);
			ps.setInt(3, 101);
//			ps.setInt(4, 33);
			ps.execute();
		} catch (SQLException e) {
			e.printStackTrace();
		}finally {
			if(conn!=null) {
				try {
					conn.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
		}
	}
	public static void queryTable(Connection conn) {
		String querySql="select * from test_user1";
//		PreparedStatement ps=conn.prepareStatement(querySql);
		Statement st;
		try {
			st = conn.createStatement();
			ResultSet rs=st.executeQuery(querySql);
			while(rs.next()) {
				System.out.print (rs.getString(1));
				System.out.print (rs.getString(2));
				System.out.print (rs.getString(3));
				System.out.println ("    ");
			}
			rs.close();
		} catch (SQLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}finally {
			if(conn!=null) {
				try {
					conn.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
		}
		
		
	}
	public static Connection getConnection() {
		Connection conn =null;
		try {
			Class.forName("com.cloudera.impala.jdbc.Driver");//指定连接类型 
			String url="jdbc:impala://10.68.18.170:21050/db1;UseSasl=0;AuthMech=0;UID=impala";
//			String url="jdbc:impala://10.3.4.31:21050/ccit_dl_ods";
//			conn = DriverManager.getConnection(url);//获取连接
			conn = DriverManager.getConnection(url,"root","huawei@123");//获取连接
			
		}catch(Exception e) {
			e.printStackTrace();
		}
		return conn;
	}
}

如果使用了Hadoop 使用了Kerberos认证,可使用如下方式进行连接。

package com.example.demo.impala;

import java.security.PrivilegedAction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.hadoop.security.UserGroupInformation;

public class ImpalaKdc {
	private static String driver = "com.cloudera.impala.jdbc.Driver";
	public static void main(String[] args) throws Exception {

		String jdbcUrl="jdbc:impala://cm2:21050/db1;AuthMech=1;KrbRealm=EXAMPLE.COM;KrbHostFQDN=cm2.cdh;KrbServiceName=impala";
		String configPath="E:\\tmp\\krb5.conf";
		String keyTabPath="E:\\tmp\\impala.keytab";
		Connection conn=getImapalaAuthConnection(jdbcUrl,"impala/cm2.cdh",configPath,keyTabPath);
		System.out.println(conn);
		queryTable(conn);
	}
    private static Connection getImapalaAuthConnection(String jdbcUrl,String username,String configPath,String keyTabPath)throws Exception{
//        System.setProperty("java.security.krb5.conf", configPath);
        Connection connection = null;
        try{
            org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); 
            conf.set("hadoop.security.authentication", "Kerberos");
            UserGroupInformation.setConfiguration(conf); 
            UserGroupInformation.loginUserFromKeytab(username, keyTabPath);
            connection = UserGroupInformation.getLoginUser().doAs(new PrivilegedAction<Connection>(){
                @Override
                public Connection run(){
                    Connection connection = null;
                    try{
                        Class.forName(driver);
                        connection = DriverManager.getConnection(jdbcUrl);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                    return connection;
                }
            });
        }catch (Exception e){
            throw e;
        }
        return connection;
    }
	public static void queryTable(Connection conn) {
		String querySql="select * from test_user1";
//		PreparedStatement ps=conn.prepareStatement(querySql);
		Statement st;
		try {
			st = conn.createStatement();
			ResultSet rs=st.executeQuery(querySql);
			while(rs.next()) {
				System.out.print (rs.getString(1));
				System.out.print (rs.getString(2));
				System.out.print (rs.getString(3));
				System.out.println ("    ");
			}
			rs.close();
		} catch (SQLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}finally {
			if(conn!=null) {
				try {
					conn.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
		}
		
	}
}


http://www.kler.cn/news/232519.html

相关文章:

  • 【linux温故】linux调度机制
  • Ubuntu22.04 gnome-builder gnome C 应用程序习练笔记(二)
  • ArcGISPro中Python相关命令总结
  • 【RPA】智能自动化的未来:AI + RPA
  • 转融通业务是什么?好处和弊端是什么?
  • 全栈笔记_插件篇(用Volar替换Vuter)
  • Redis之基础篇
  • 【算法练习】leetcode算法题合集之其他篇
  • QT基础教程(全系列教程目录)
  • Java学习笔记------API
  • Flink Checkpoint过程
  • 一周学会Django5 Python Web开发-Django5创建项目(用命令方式)
  • Python数据分析 可视化数据Seaborn图表 这篇就够了
  • 【lesson47】进程通信之system V(共享内存)补充知识
  • PgSQL技术内幕 - case when表达式实现机制
  • 【Linux系统学习】3.Linux用户和权限
  • C++2024寒假J312实战班2.6
  • C语言冒泡排序介绍
  • 面试复盘——10
  • C++三剑客之std::any(一) : 使用
  • 【MySQL进阶之路】BufferPool底层设计(下)
  • 【GAMES101】Lecture 19 相机
  • java——学习并推荐java8
  • 猫头虎分享已解决Bug :内存泄漏(Memory Leak)
  • Kubernetes实战(二十七)-HPA实战
  • Web Services 服务 是不是过时了?创建 Web Services 服务实例
  • 【Linux】SystemV IPC
  • CTFshow web(命令执行 41-44)
  • WPF 中 Loaded 和 Closing 窗口事件
  • K210如何下载程序