当前位置: 首页 > article >正文

大数据技术(八)—— HBase数据读写流程和Api的使用

目录

一、读数据

二、写数据

1、写数据

2、数据刷到磁盘过程

3、数据合并过程

三、Api的使用

四、Phoenix

1、安装Phoenix

2、简单使用

2.1、查看表信息

2.2、创建表

2.3、插入数据

2.4、查看数据

2.5、修改数据

2.6、删除数据

2.7、退出

2.8、导入数据

3、API的使用

五、整合SpringBoot+MyBatis

六、参考


一、读数据

  1. Client先访问zookeeper,请求获取Meta表信息。从上面图可以看到Zookeeper中存储了HBase的元数据信息。
  2. Zookeeper返回meta数据信息。
  3. 根据namespace、表名和rowkey在meta表的信息去获取数据所在的RegionServer和Region
  4. 找到这个region对应的regionserver;
  5. 查找对应的region;
  6. 先从MemStore找数据,如果没有,再到BlockCache里面读;
  7. BlockCache还没有,再到StoreFile上读(为了读取的效率);
  8. 如果是从StoreFile里面读取的数据,不是直接返回给客户端,而是先写入BlockCache,再返回给客户端。

二、写数据

1、写数据

写数据和读数据前几步的过程都是一样的,首先从Zookeeper获取Meta表信息,根据meta表信息确认RegionServer 和Region

  1. Client向RegionServer发送写请求。
  2. RegionServer将数据写到HLog(write ahead log)。为了数据的持久化和恢复(避免数据在内存还没刷到磁盘就宕机丢失数据)。
  3. RegionServer将数据写到内存(MemStore),此时就认为写数据完成了,至于什么时候数据写进HDFS,它不关心。
  4. 反馈Client写成功。

2、数据刷到磁盘过程

当MemStore数据达到阈值(默认是128M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog(Write Ahead Log)中的历史数据;并将数据存储到HDFS中;在HLog中做标记点。

3、数据合并过程

  1. 当数据块达到3块,Hmaster触发合并操作,Region将数据块加载到本地,进行合并。
  2. 当合并的数据超过256M,进行拆分,将拆分后的Region分配给不同的HregionServer管理。
  3. 当RegionServer宕机后,将RegionServer上的hlog拆分,然后分配给不同的RegionServer加载,修改.META。
  4. HLog的数据会同步到HDFS,保证数据的可靠性。

三、Api的使用

package com.xiaojie.hadoop.utils;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.quotas.QuotaType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;

import java.io.IOException;
import java.util.List;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: hbase工具类
 * @date 2025/1/3 9:27
 */
@Slf4j
public class HBaseUtil {

    private static Connection connection;

    static {
        Configuration configuration = HBaseConfiguration.create();
        //设置端口号
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        //设置zk连接
        configuration.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
        //创建连接
        try {
            connection = ConnectionFactory.createConnection(configuration);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }


    /**
     * @param namespace
     * @description: 创建namespace
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 11:03
     */
    public static boolean createNameSpace(String namespace) {
        try {
            Admin admin = connection.getAdmin();
            NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
            admin.createNamespace(namespaceDescriptor);
            log.info(">>>>>>>>>>>>创建namespace成功");
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param namespace
     * @description: 删除ns
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 11:05
     */
    public static boolean deleteNameSpace(String namespace) {
        try {
            Admin admin = connection.getAdmin();
            admin.deleteNamespace(namespace);
            log.info(">>>>>>>>>>>>删除namespace成功");
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param tableName      表名
     * @param columnFamilies 列族
     * @description: 创建表
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 9:35
     */
    public static boolean createTable(String tableName, List<String> columnFamilies, String nameSpace) throws IOException {
        Admin admin = connection.getAdmin();
        boolean exists = admin.tableExists(TableName.valueOf(tableName));
        //创建表
        if (!exists) {
            //如果namespace是空值则会使用default,作为命名空间
            TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(nameSpace, tableName));
            columnFamilies.forEach(cf -> {
                ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));
                columnFamilyDescriptorBuilder.setMaxVersions(1);
                ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build();
                tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
            });
            admin.createTable(tableDescriptorBuilder.build());
            return true;
        } else {
            log.info("table exists>>>>>>>>");
            return false;
        }
    }

    /**
     * @param tableName 表名
     * @description: 删除表
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 10:16
     */
    public static boolean deleteTable(String tableName) {
        try {
            Admin admin = connection.getAdmin();
            //先禁用表
            admin.disableTable(TableName.valueOf(tableName));
            //再删除
            admin.deleteTable(TableName.valueOf(tableName));
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param tableName        表名
     * @param rowKey           rowkey
     * @param columnFamilyName 列族
     * @param qualifier        列标识
     * @param value            数据
     * @description: 插入数据
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 16:46
     */
    public static boolean putRow(String nameSpace, String tableName, String rowKey, String columnFamilyName, String qualifier,
                                 String value) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
            table.put(put);
            log.info(">>>>>>>插入数据成功");
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    /**
     * @param nameSpace
     * @param tableName
     * @param rowKey
     * @param columnFamilyName
     * @param pairList         键值对集合
     * @description: 插入数据
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 17:32
     */
    public static boolean putRow(String nameSpace, String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) {

        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Put put = new Put(Bytes.toBytes(rowKey));
            pairList.forEach(pair -> {
                put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getFirst()), Bytes.toBytes(pair.getSecond()));
            });
            table.put(put);
            log.info(">>>>>>>插入数据成功");
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    /**
     * @param nameSpace
     * @param tableName
     * @param rowKey
     * @description:根据Rowkey查询
     * @return: org.apache.hadoop.hbase.client.Result
     * @author 熟透的蜗牛
     * @date: 2025/1/3 17:42
     */
    public static Result getRowByRowKey(String nameSpace, String tableName, String rowKey) {
        try {
            Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Result result = table.get(new Get(Bytes.toBytes(rowKey)));
            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param nameSpace
     * @param tableName
     * @description: 查询所有数据, 和范围查询
     * @return: org.apache.hadoop.hbase.client.ResultScanner
     * @author 熟透的蜗牛
     * @date: 2025/1/3 18:12
     */
    public static ResultScanner getAll(String nameSpace, String tableName) {
        try {
            Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Scan scan = new Scan();
            scan.setCacheBlocks(true);//设置读缓存
            scan.withStartRow(Bytes.toBytes("1002")); //rowkey的起始值
            scan.withStopRow(Bytes.toBytes("1003"));  //rowkey的结束值,返回结果不包含该值
            ResultScanner scanner = table.getScanner(scan);
            return scanner;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param nameSpace
     * @param tableName
     * @param filterList
     * @description: 查找过滤
     * @return: org.apache.hadoop.hbase.client.ResultScanner
     * @author 熟透的蜗牛
     * @date: 2025/1/3 20:47
     */
    public static ResultScanner getScanner(String nameSpace, String tableName, FilterList filterList) {
        try {
            Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Scan scan = new Scan();
            scan.setFilter(filterList);
            ResultScanner scanner = table.getScanner(scan);
            return scanner;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param tableName
     * @param rowKey
     * @param columnFamily 列族
     * @param qualifier    限定符
     * @description: 获取指定cell数据
     * @return: java.lang.String
     * @author 熟透的蜗牛
     * @date: 2025/1/3 20:59
     */
    public static String getCell(String nameSpace, String tableName, String rowKey, String columnFamily, String qualifier) {
        try {
            Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Get get = new Get(Bytes.toBytes(rowKey));
            if (!get.isCheckExistenceOnly()) {
                get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
                Result result = table.get(get);
                byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
                return Bytes.toString(resultValue);
            }

        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return null;
    }

    /**
     * @param nameSpace
     * @param tableName
     * @param rowKey
     * @description: 删除一行数据
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 21:34
     */
    public static boolean deleteRow(String nameSpace, String tableName, String rowKey) {
        try {
            Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            table.delete(delete);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }

    /**
     * @param nameSpace
     * @param tableName
     * @param rowKey
     * @param familyName
     * @param qualifier
     * @description: 删除指定列
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 21:34
     */
    public static boolean deleteColumn(String nameSpace, String tableName, String rowKey, String familyName,
                                       String qualifier) {
        try {
            Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));
            table.delete(delete);
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }


}

四、Phoenix

Phoenix是 HBase 的开源 SQL 中间层,它允许你使用标准 JDBC 的方式来操作 HBase 上的数据。

1、安装Phoenix

#解压文件
tar -zxf phoenix-hbase-2.6-5.2.1-bin.tar.gz 

#移动文件
mv phoenix-hbase-2.6-5.2.1-bin /usr/local/

#复制jar到regionserver 和master的lib
cp phoenix-server-hbase-2.6.jar /usr/local/hbase-2.6.1/lib 

#分发到其他服务器
xsync /usr/local/hbase-2.6.1/

#重启hbase
./hbase.sh stop
./hbase.sh start

#启动phoenix
/usr/local/phoenix-hbase-2.6-5.2.1-bin/bin/sqlline.py hadoop1,hadoop2,hadoop3

2、简单使用

2.1、查看表信息

#查看表信息
!tables

2.2、创建表

CREATE TABLE IF NOT EXISTS us_population (
      state CHAR(2) NOT NULL,
      city VARCHAR NOT NULL,
      population BIGINT
      CONSTRAINT my_pk PRIMARY KEY (state, city));

2.3、插入数据

#插入数据
UPSERT INTO us_population VALUES('NY','New York',8143197);

2.4、查看数据

select * from us_population;

2.5、修改数据

UPSERT INTO us_population VALUES('NY','New York',999999);

2.6、删除数据

DELETE FROM us_population WHERE city='New York';

#删除表
drop table us_population;

2.7、退出

!quit

2.8、导入数据

us_population.csv

NY,New York,8143197
CA,Los Angeles,3844829
IL,Chicago,2842518
TX,Houston,2016582
PA,Philadelphia,1463281
AZ,Phoenix,1461575
TX,San Antonio,1256509
CA,San Diego,1255540
TX,Dallas,1213825
CA,San Jose,912332

us_population.sql

CREATE TABLE IF NOT EXISTS us_population (
  state CHAR(2) NOT NULL,
  city VARCHAR NOT NULL,
  population BIGINT
  CONSTRAINT my_pk PRIMARY KEY (state, city));
#导入数据
./psql.py hadoop1,hadoop2,hadoop3 us_population.sql us_population.csv

#查看数据
select * from us_population;

#查询数据
SELECT state as "State",count(city) as "City Count",sum(population) as "Population Sum"
FROM us_population
GROUP BY state
ORDER BY sum(population) DESC;

3、API的使用

package com.xiaojie;

import java.sql.*;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: 测试phoenix
 * @date 2025/1/3 23:08
 */
public class PhoenixApi {

    public static void main(String[] args) throws ClassNotFoundException, SQLException {
        //1、加载驱动
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");

        //2、创建连接
        String url = "jdbc:phoenix:hadoop1,hadoop2,hadoop3";
        Connection connect = DriverManager.getConnection(url);
        //3、创建查询
        PreparedStatement preparedStatement = connect.prepareStatement("SELECT * FROM us_population");

        //4、遍历结果
        ResultSet resultSet = preparedStatement.executeQuery();
        while (resultSet.next()) {
            System.out.println(resultSet.getString("city") + " "
                               + resultSet.getInt("population"));
        }
        //5、关闭资源
        preparedStatement.close();
        resultSet.close();
        connect.close();
    }
}

五、整合SpringBoot+MyBatis

package com.xiaojie.hadoop.hbase.phoenix.dao;

import com.xiaojie.hadoop.hbase.phoenix.bean.USPopulation;
import org.apache.ibatis.annotations.*;

import java.util.List;

@Mapper
public interface PopulationDao {

    @Select("SELECT * from us_population")
    List<USPopulation> queryAll();

    @Insert("UPSERT INTO us_population VALUES( #{state}, #{city}, #{population} )")
    void save(USPopulation USPopulation);

    @Select("SELECT * FROM us_population WHERE state=#{state} AND city = #{city}")
    USPopulation queryByStateAndCity(String state, String city);


    @Delete("DELETE FROM us_population WHERE state=#{state} AND city = #{city}")
    void deleteByStateAndCity(String state, String city);

}
server:
  port: 9999
spring:
  application:
    name: hadoop
  datasource:
    #zookeeper地址
    url: jdbc:phoenix:hadoop1,hadoop2,hadoop3
    driver-class-name: org.apache.phoenix.jdbc.PhoenixDriver
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      # 池中维护的最小空闲连接数
      minimum-idle: 10
      # 池中最大连接数,包括闲置和使用中的连接
      maximum-pool-size: 20
      # 此属性控制从池返回的连接的默认自动提交行为。默认为true
      auto-commit: true
      # 允许最长空闲时间
      idle-timeout: 30000
      # 此属性表示连接池的用户定义名称,主要显示在日志记录和JMX管理控制台中,以标识池和池配置。 默认值:自动生成
      pool-name: custom-hikari
      #此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
      max-lifetime: 1800000
      # 数据库连接超时时间,默认30秒,即30000
      connection-timeout: 30000
      # 连接测试sql 这个地方需要根据数据库方言差异而配置 例如 oracle 就应该写成  select 1 from dual
      connection-test-query: SELECT 1
wssnail:
  hdfs:
    url: hdfs://hadoop1:8020
    user-name: root
# mybatis 相关配置
mybatis:
  configuration:
    # 是否打印sql语句 调试的时候可以开启
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

六、参考

https://phoenix.apache.org/Phoenix-in-15-minutes-or-less.html

完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码 - Gitee.com


http://www.kler.cn/a/471613.html

相关文章:

  • uniapp 导入uview-plus,使用组件出现,页面出现<up-parse>元素不存在,请检查你的代码
  • Blazor用户身份验证状态详解
  • Elasticsearch分片数量是什么意思?
  • .net core 为什么使用 null!
  • 23.行号没有了怎么办 滚动条没有了怎么办 C#例子
  • linux-27 发行版以及跟内核的关系
  • uniapp打包到宝塔并发布
  • 使用python将自己的程序封装成API
  • 使用Python实现医疗物联网设备:构建高效医疗监测系统
  • 快速排序进阶版(加入插入排序提高其性能)
  • 【代码随想录】刷题记录(93)-无重叠区间
  • Requests-数据解析bs4+xpath
  • UWB实操:用信号分析仪(频谱分析仪)抓取UWB频域的图像
  • 【JMeter】多接口关联
  • es 3期 第22节-Bucket特殊分桶聚合实战
  • 【往届已EI检索】第五届智慧城市工程与公共交通国际学术会议(SCEPT 2025)
  • 在 PhpStorm 中配置命令行直接运行 PHP 的步骤
  • 后端开发入门超完整速成路线(算法篇)
  • 计算机网络:无线网络
  • 矩阵和向量点乘叉乘元素乘
  • ue5 替换角色的骨骼网格体和动画蓝图
  • 计算机网络之---计算机网络的性能评估
  • Redis中的主从/Redis八股
  • 信息安全:Java自定义Jackson序列化器进行数据脱敏
  • 如何在新窗口打开pdf文件,并修改网页标题
  • 【前端系列02】Pinia状态管理库