大数据技术(八)—— HBase数据读写流程和Api的使用
目录
一、读数据
二、写数据
1、写数据
2、数据刷到磁盘过程
3、数据合并过程
三、Api的使用
四、Phoenix
1、安装Phoenix
2、简单使用
2.1、查看表信息
2.2、创建表
2.3、插入数据
2.4、查看数据
2.5、修改数据
2.6、删除数据
2.7、退出
2.8、导入数据
3、API的使用
五、整合SpringBoot+MyBatis
六、参考
一、读数据
- Client先访问zookeeper,请求获取Meta表信息。从上面图可以看到Zookeeper中存储了HBase的元数据信息。
- Zookeeper返回meta数据信息。
- 根据namespace、表名和rowkey在meta表的信息去获取数据所在的RegionServer和Region
- 找到这个region对应的regionserver;
- 查找对应的region;
- 先从MemStore找数据,如果没有,再到BlockCache里面读;
- BlockCache还没有,再到StoreFile上读(为了读取的效率);
- 如果是从StoreFile里面读取的数据,不是直接返回给客户端,而是先写入BlockCache,再返回给客户端。
二、写数据
1、写数据
写数据和读数据前几步的过程都是一样的,首先从Zookeeper获取Meta表信息,根据meta表信息确认RegionServer 和Region
- Client向RegionServer发送写请求。
- RegionServer将数据写到HLog(write ahead log)。为了数据的持久化和恢复(避免数据在内存还没刷到磁盘就宕机丢失数据)。
- RegionServer将数据写到内存(MemStore),此时就认为写数据完成了,至于什么时候数据写进HDFS,它不关心。
- 反馈Client写成功。
2、数据刷到磁盘过程
当MemStore数据达到阈值(默认是128M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog(Write Ahead Log)中的历史数据;并将数据存储到HDFS中;在HLog中做标记点。
3、数据合并过程
- 当数据块达到3块,Hmaster触发合并操作,Region将数据块加载到本地,进行合并。
- 当合并的数据超过256M,进行拆分,将拆分后的Region分配给不同的HregionServer管理。
- 当RegionServer宕机后,将RegionServer上的hlog拆分,然后分配给不同的RegionServer加载,修改.META。
- HLog的数据会同步到HDFS,保证数据的可靠性。
三、Api的使用
package com.xiaojie.hadoop.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.quotas.QuotaType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import java.io.IOException;
import java.util.List;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: hbase工具类
* @date 2025/1/3 9:27
*/
@Slf4j
public class HBaseUtil {
private static Connection connection;
static {
Configuration configuration = HBaseConfiguration.create();
//设置端口号
configuration.set("hbase.zookeeper.property.clientPort", "2181");
//设置zk连接
configuration.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
//创建连接
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* @param namespace
* @description: 创建namespace
* @return: boolean
* @author 熟透的蜗牛
* @date: 2025/1/3 11:03
*/
public static boolean createNameSpace(String namespace) {
try {
Admin admin = connection.getAdmin();
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
admin.createNamespace(namespaceDescriptor);
log.info(">>>>>>>>>>>>创建namespace成功");
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* @param namespace
* @description: 删除ns
* @return: boolean
* @author 熟透的蜗牛
* @date: 2025/1/3 11:05
*/
public static boolean deleteNameSpace(String namespace) {
try {
Admin admin = connection.getAdmin();
admin.deleteNamespace(namespace);
log.info(">>>>>>>>>>>>删除namespace成功");
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* @param tableName 表名
* @param columnFamilies 列族
* @description: 创建表
* @return: boolean
* @author 熟透的蜗牛
* @date: 2025/1/3 9:35
*/
public static boolean createTable(String tableName, List<String> columnFamilies, String nameSpace) throws IOException {
Admin admin = connection.getAdmin();
boolean exists = admin.tableExists(TableName.valueOf(tableName));
//创建表
if (!exists) {
//如果namespace是空值则会使用default,作为命名空间
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(nameSpace, tableName));
columnFamilies.forEach(cf -> {
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));
columnFamilyDescriptorBuilder.setMaxVersions(1);
ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build();
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
});
admin.createTable(tableDescriptorBuilder.build());
return true;
} else {
log.info("table exists>>>>>>>>");
return false;
}
}
/**
* @param tableName 表名
* @description: 删除表
* @return: boolean
* @author 熟透的蜗牛
* @date: 2025/1/3 10:16
*/
public static boolean deleteTable(String tableName) {
try {
Admin admin = connection.getAdmin();
//先禁用表
admin.disableTable(TableName.valueOf(tableName));
//再删除
admin.deleteTable(TableName.valueOf(tableName));
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* @param tableName 表名
* @param rowKey rowkey
* @param columnFamilyName 列族
* @param qualifier 列标识
* @param value 数据
* @description: 插入数据
* @return: boolean
* @author 熟透的蜗牛
* @date: 2025/1/3 16:46
*/
public static boolean putRow(String nameSpace, String tableName, String rowKey, String columnFamilyName, String qualifier,
String value) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(nameSpace, tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
table.put(put);
log.info(">>>>>>>插入数据成功");
return true;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
/**
* @param nameSpace
* @param tableName
* @param rowKey
* @param columnFamilyName
* @param pairList 键值对集合
* @description: 插入数据
* @return: boolean
* @author 熟透的蜗牛
* @date: 2025/1/3 17:32
*/
public static boolean putRow(String nameSpace, String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(nameSpace, tableName));
Put put = new Put(Bytes.toBytes(rowKey));
pairList.forEach(pair -> {
put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getFirst()), Bytes.toBytes(pair.getSecond()));
});
table.put(put);
log.info(">>>>>>>插入数据成功");
return true;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
/**
* @param nameSpace
* @param tableName
* @param rowKey
* @description:根据Rowkey查询
* @return: org.apache.hadoop.hbase.client.Result
* @author 熟透的蜗牛
* @date: 2025/1/3 17:42
*/
public static Result getRowByRowKey(String nameSpace, String tableName, String rowKey) {
try {
Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
Result result = table.get(new Get(Bytes.toBytes(rowKey)));
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* @param nameSpace
* @param tableName
* @description: 查询所有数据, 和范围查询
* @return: org.apache.hadoop.hbase.client.ResultScanner
* @author 熟透的蜗牛
* @date: 2025/1/3 18:12
*/
public static ResultScanner getAll(String nameSpace, String tableName) {
try {
Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
Scan scan = new Scan();
scan.setCacheBlocks(true);//设置读缓存
scan.withStartRow(Bytes.toBytes("1002")); //rowkey的起始值
scan.withStopRow(Bytes.toBytes("1003")); //rowkey的结束值,返回结果不包含该值
ResultScanner scanner = table.getScanner(scan);
return scanner;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* @param nameSpace
* @param tableName
* @param filterList
* @description: 查找过滤
* @return: org.apache.hadoop.hbase.client.ResultScanner
* @author 熟透的蜗牛
* @date: 2025/1/3 20:47
*/
public static ResultScanner getScanner(String nameSpace, String tableName, FilterList filterList) {
try {
Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
Scan scan = new Scan();
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
return scanner;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* @param tableName
* @param rowKey
* @param columnFamily 列族
* @param qualifier 限定符
* @description: 获取指定cell数据
* @return: java.lang.String
* @author 熟透的蜗牛
* @date: 2025/1/3 20:59
*/
public static String getCell(String nameSpace, String tableName, String rowKey, String columnFamily, String qualifier) {
try {
Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
Get get = new Get(Bytes.toBytes(rowKey));
if (!get.isCheckExistenceOnly()) {
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
Result result = table.get(get);
byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
return Bytes.toString(resultValue);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
/**
* @param nameSpace
* @param tableName
* @param rowKey
* @description: 删除一行数据
* @return: boolean
* @author 熟透的蜗牛
* @date: 2025/1/3 21:34
*/
public static boolean deleteRow(String nameSpace, String tableName, String rowKey) {
try {
Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/**
* @param nameSpace
* @param tableName
* @param rowKey
* @param familyName
* @param qualifier
* @description: 删除指定列
* @return: boolean
* @author 熟透的蜗牛
* @date: 2025/1/3 21:34
*/
public static boolean deleteColumn(String nameSpace, String tableName, String rowKey, String familyName,
String qualifier) {
try {
Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));
table.delete(delete);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
}
四、Phoenix
Phoenix是 HBase 的开源 SQL 中间层,它允许你使用标准 JDBC 的方式来操作 HBase 上的数据。
1、安装Phoenix
#解压文件
tar -zxf phoenix-hbase-2.6-5.2.1-bin.tar.gz
#移动文件
mv phoenix-hbase-2.6-5.2.1-bin /usr/local/
#复制jar到regionserver 和master的lib
cp phoenix-server-hbase-2.6.jar /usr/local/hbase-2.6.1/lib
#分发到其他服务器
xsync /usr/local/hbase-2.6.1/
#重启hbase
./hbase.sh stop
./hbase.sh start
#启动phoenix
/usr/local/phoenix-hbase-2.6-5.2.1-bin/bin/sqlline.py hadoop1,hadoop2,hadoop3
2、简单使用
2.1、查看表信息
#查看表信息
!tables
2.2、创建表
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
2.3、插入数据
#插入数据
UPSERT INTO us_population VALUES('NY','New York',8143197);
2.4、查看数据
select * from us_population;
2.5、修改数据
UPSERT INTO us_population VALUES('NY','New York',999999);
2.6、删除数据
DELETE FROM us_population WHERE city='New York';
#删除表
drop table us_population;
2.7、退出
!quit
2.8、导入数据
us_population.csv
NY,New York,8143197
CA,Los Angeles,3844829
IL,Chicago,2842518
TX,Houston,2016582
PA,Philadelphia,1463281
AZ,Phoenix,1461575
TX,San Antonio,1256509
CA,San Diego,1255540
TX,Dallas,1213825
CA,San Jose,912332
us_population.sql
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
#导入数据
./psql.py hadoop1,hadoop2,hadoop3 us_population.sql us_population.csv
#查看数据
select * from us_population;
#查询数据
SELECT state as "State",count(city) as "City Count",sum(population) as "Population Sum"
FROM us_population
GROUP BY state
ORDER BY sum(population) DESC;
3、API的使用
package com.xiaojie;
import java.sql.*;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: 测试phoenix
* @date 2025/1/3 23:08
*/
public class PhoenixApi {
public static void main(String[] args) throws ClassNotFoundException, SQLException {
//1、加载驱动
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
//2、创建连接
String url = "jdbc:phoenix:hadoop1,hadoop2,hadoop3";
Connection connect = DriverManager.getConnection(url);
//3、创建查询
PreparedStatement preparedStatement = connect.prepareStatement("SELECT * FROM us_population");
//4、遍历结果
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
System.out.println(resultSet.getString("city") + " "
+ resultSet.getInt("population"));
}
//5、关闭资源
preparedStatement.close();
resultSet.close();
connect.close();
}
}
五、整合SpringBoot+MyBatis
package com.xiaojie.hadoop.hbase.phoenix.dao;
import com.xiaojie.hadoop.hbase.phoenix.bean.USPopulation;
import org.apache.ibatis.annotations.*;
import java.util.List;
@Mapper
public interface PopulationDao {
@Select("SELECT * from us_population")
List<USPopulation> queryAll();
@Insert("UPSERT INTO us_population VALUES( #{state}, #{city}, #{population} )")
void save(USPopulation USPopulation);
@Select("SELECT * FROM us_population WHERE state=#{state} AND city = #{city}")
USPopulation queryByStateAndCity(String state, String city);
@Delete("DELETE FROM us_population WHERE state=#{state} AND city = #{city}")
void deleteByStateAndCity(String state, String city);
}
server:
port: 9999
spring:
application:
name: hadoop
datasource:
#zookeeper地址
url: jdbc:phoenix:hadoop1,hadoop2,hadoop3
driver-class-name: org.apache.phoenix.jdbc.PhoenixDriver
type: com.zaxxer.hikari.HikariDataSource
hikari:
# 池中维护的最小空闲连接数
minimum-idle: 10
# 池中最大连接数,包括闲置和使用中的连接
maximum-pool-size: 20
# 此属性控制从池返回的连接的默认自动提交行为。默认为true
auto-commit: true
# 允许最长空闲时间
idle-timeout: 30000
# 此属性表示连接池的用户定义名称,主要显示在日志记录和JMX管理控制台中,以标识池和池配置。 默认值:自动生成
pool-name: custom-hikari
#此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
max-lifetime: 1800000
# 数据库连接超时时间,默认30秒,即30000
connection-timeout: 30000
# 连接测试sql 这个地方需要根据数据库方言差异而配置 例如 oracle 就应该写成 select 1 from dual
connection-test-query: SELECT 1
wssnail:
hdfs:
url: hdfs://hadoop1:8020
user-name: root
# mybatis 相关配置
mybatis:
configuration:
# 是否打印sql语句 调试的时候可以开启
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
六、参考
https://phoenix.apache.org/Phoenix-in-15-minutes-or-less.html
完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码 - Gitee.com