【大数据学习 | HBASE高级】hbase的API操作
首先引入hbase的依赖
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.4.13</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
将hbase-site.xml放入到resouces文件夹中
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<!--
The following properties are set for running HBase as a single process on a
developer workstation. With this configuration, HBase is running in
"stand-alone" mode and without a distributed file system. In this mode, and
without further configuration, HBase and ZooKeeper data are stored on the
local filesystem, in a path under the value configured for `hbase.tmp.dir`.
This value is overridden from its default value of `/tmp` because many
systems clean `/tmp` on a regular basis. Instead, it points to a path within
this HBase installation directory.
Running against the `LocalFileSystem`, as opposed to a distributed
filesystem, runs the risk of data integrity issues and data loss. Normally
HBase will refuse to run in such an environment. Setting
`hbase.unsafe.stream.capability.enforce` to `false` overrides this behavior,
permitting operation. This configuration is for the developer workstation
only and __should not be used in production!__
See also https://hbase.apache.org/book.html#standalone_dist
-->
<property>
<name>hbase.rootdir</name>
<value>hdfs://ns1/hbase</value>
</property>
<!-- hbase在hdfs中的存储位置 -->
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 开启hbase的全分布式 -->
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<!-- zookeeper的端口号 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>nn1,nn2,s1</value>
</property>
<!-- zookeeper集群的主机名 -->
<property>
<name>hbase.tmp.dir</name>
<value>./tmp</value>
</property>
<!-- hbase的临时文件存储路径 -->
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
<!-- 开启配置防止hmaster启动问题 -->
<property>
<name>hbase.master.info.port</name>
<value>60010</value>
</property>
<!-- 监控页面端口 -->
</configuration>
整体代码如下:
package com.hainiu.hbase;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.ColumnValueFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class TestHbase {
public static Connection connection;
static{
try {
connection = ConnectionFactory.createConnection();
//创建链接
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void TestCreateNameSpace() throws IOException {
Admin admin = connection.getAdmin();
//获取管理员对象
NamespaceDescriptor desc = NamespaceDescriptor.create("test").build();
//创建命名空间描述
admin.createNamespace(desc);
}
public static void TestSearchNameSpace()throws Exception{
Admin admin = connection.getAdmin();
//获取管理员对象
String[] spaces = admin.listNamespaces();
for (String space : spaces) {
System.out.println(space);
}
}
public static void TestCreateTable()throws Exception{
Admin admin = connection.getAdmin();
TableDescriptorBuilder build = TableDescriptorBuilder.newBuilder(TableName.valueOf("test:student"));
//创建表描述对象
ColumnFamilyDescriptor info = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();
//创建列描述对象
TableDescriptor desc = build.setColumnFamily(info).build();
//将列和表融合
admin.createTable(desc);
}
public static void TestListTable() throws Exception{
Admin admin = connection.getAdmin();
List<TableDescriptor> tableDescriptors = admin.listTableDescriptors();
//创建表查询对象
for (TableDescriptor tableDescriptor : tableDescriptors) {
TableName name = tableDescriptor.getTableName();
System.out.println(name);
}
}
public static void TestDeleteTable()throws Exception{
Admin admin = connection.getAdmin();
admin.disableTable(TableName.valueOf("test:student"));
admin.deleteTable(TableName.valueOf("test:student"));
}
public static void TestInsertData() throws Exception{
Table table = connection.getTable(TableName.valueOf("test:student"));
Put put = new Put(Bytes.toBytes("001"));
//创建插入对象
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes("zhangsan"));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes("20"));
//增加列值
table.put(put);
}
public static void TestInsertDataBatch() throws Exception{
Table table = connection.getTable(TableName.valueOf("test:student"));
List<Put> list = new ArrayList<Put>();
for(int i=0;i<100;i++){
Put put = new Put(Bytes.toBytes(i));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes("zhangsan"+i));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(i));
list.add(put);
}
table.put(list);
}
public static void TestGetData()throws Exception{
Table table = connection.getTable(TableName.valueOf("test:student"));
Get get = new Get(Bytes.toBytes(1));
Result result = table.get(get);
//获取一行内容数据
byte[] name = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"));
byte[] age = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age"));
//列和列族的数据必须是字节数组
String name_str = Bytes.toString(name);
int age_int = Bytes.toInt(age);
//查询完毕的数据要转换为string或者int的原类型
System.out.println(name_str+","+age_int);
}
public static void TestScan()throws Exception{
Table table = connection.getTable(TableName.valueOf("test:student"));
Scan scan = new Scan();
ResultScanner res = table.getScanner(scan);
//创建扫面对象
for(Result r:res){
byte[] name = r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"));
byte[] age = r.getValue(Bytes.toBytes("info"), Bytes.toBytes("age"));
String name_str = Bytes.toString(name);
int age_int = Bytes.toInt(age);
System.out.println(name_str+","+age_int);
}
}
public static void TestScanLimit()throws Exception{
Table table = connection.getTable(TableName.valueOf("test:student"));
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(10));
scan.withStopRow(Bytes.toBytes(30));
//增加rowkey的扫描范围
ResultScanner res = table.getScanner(scan);
for(Result r:res){
byte[] name = r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"));
byte[] age = r.getValue(Bytes.toBytes("info"), Bytes.toBytes("age"));
String name_str = Bytes.toString(name);
int age_int = Bytes.toInt(age);
System.out.println(name_str+","+age_int);
}
}
public static void TestScanWithFilter()throws Exception{
Table table = connection.getTable(TableName.valueOf("test:student"));
Scan scan = new Scan();
// ColumnValueFilter filter = new ColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOperator.EQUAL, Bytes.toBytes(30));
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("age"),
//增加过滤器,ColumnValueFilter只能显示出一列,SingleColumnValueFilter能够显示出来所有的列
CompareOperator.EQUAL, Bytes.toBytes(20));
scan.setFilter(filter);
ResultScanner res = table.getScanner(scan);
for(Result r:res){
byte[] name = r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"));
byte[] age = r.getValue(Bytes.toBytes("info"), Bytes.toBytes("age"));
String name_str = Bytes.toString(name);
int age_int = Bytes.toInt(age);
System.out.println(name_str+","+age_int);
}
}
public static void deleteData() throws Exception{
Table table = connection.getTable(TableName.valueOf("test:student"));
Delete delete = new Delete(Bytes.toBytes(20));
table.delete(delete);
}
public static void main(String[] args) throws Exception{
// TestCreateNameSpace();
// TestSearchNameSpace();
// TestCreateTable();
// TestListTable();
// TestDeleteTable();
// TestInsertData();
// TestInsertDataBatch();
// TestGetData();
// TestScan();
// TestScanLimit();
// TestScanWithFilter();
// deleteData();
connection.close();
}
}