ruoyi+Hadoop+hbase实现大数据存储查询
前言
有个现实的需求,数据量可能在100亿条左右。现有的数据库是SQL Server,随着采集的数据不断的填充,查询的效率越来越慢(现有的SQL Server查询已经需要数十秒钟的时间),看看有没有优化的方案。
考虑过SQL Server加索引、分区表、分库分表等方案,但数据量增长太快,还是很快就会遇到瓶颈,因此需要更优化的技术。在众多的NOSQL和大数据技术之下,针对此场景,主要考虑了两种方案:
-
MongoDB:json文档型数据库,可以通过集群拓展。但更适合列比较复杂的场景快速查询。
-
Hadoop:大数据领域的瑞士军刀,周边有很多相配套的工具可以使用,后期拓展性较强。
因为此需求只是简单的根据编码找到对应的卷号,因此最终选择Hadoop实现。
部署Hadoop
直接去官方下载,https://hadoop.apache.org/。
要注意版本的问题,版本不匹配会带来很多麻烦。我这里选择的是hadoop 3.3.4的版本。
步骤:
- 找到hadoop对应版本的winutils.exe、hadoop.dll文件
复制hadoop 3.3.4版本对应的winutils.exe和hadoop.dll文件到hadoop的bin文件夹下面。同步复制这两个文件,到C:\Windows\System32下面。
这两个文件可以去github上面搜索,一定要注意跟你的hadoop版本一致,否则不通过。
- 文件配置(下面的配置文件都在 hadoop 3.3.4/etc/hadoop 文件夹内)
a). hadoop-env.cmd文件配置
set JAVA_HOME=C:\Users\Administrator\.jdks\corretto-11.0.21
注意:这里的JAVA_HOME是指向的openjdk(开源)的版本,oracle的jdk用不起来。必须要安装openjdk。
b). core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
c). hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/hadoop-3.3.4/data/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/hadoop-3.3.4/data/datanode</value>
</property>
</configuration>
d). yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.auservices.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
</configuration>
- 配置环境变量
再添加到Path,%HADOOP_HOME%\bin
可以在控制台输入:hadoop version,验证是否安装配置正确
最后在控制台输入:start-all.cmd ,启动Hadoop。没有错误信息,表示Hadoop启动成功。
部署Hbase
安装Hbase可以到官网下载:https://hbase.apache.org/。
同样要非常关注版本的问题,因为我上面选择的Hadoop是3.3.4,与之配套的Hbase的版本是2.5.5。
步骤:
-
将之前下载的winutils.exe和hadoop.dll文件拷贝到 hbase的bin目录下,比如我的:E:\hbase-2.5.5\bin。
-
文件配置
在hbase的conf目录下,打开hbase-site.xml文件,添加如下内容:
<configuration>
<property>
<name>hbase.rootdir</name>
<value>file:///E:/hbase-2.5.5/root</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>false</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>127.0.0.1</value>
</property>
<property>
<name>hbase.tmp.dir</name>
<value>./tmp</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
</configuration>
按照上述的配置说明,在hbase目录下,添加root和tmp文件夹。
3.配置环境变量(此处省略,参考上面的hadoop的截图)
找到hbase的bin目录下的start-hbase.cmd文件,双击启动。
hbase启动完成后的界面:
基于若依进行二次开发
直接引用ruoyi的项目,在里面添加功能,当然首先需要导入相应的jar包(这些jar包在hadoop和hbase里面都有,直接引用即可)。
当然下面还有引用的jar包,这里就不截图了,供参考。
该项目基于SpringBoot框架,实现了基于HDFS、hbase的基础功能。
控制器代码如下:
package com.ruoyi.web.controller.roll;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.domain.entity.SysRole;
import com.ruoyi.common.core.page.TableDataInfo;
import com.ruoyi.common.roll.RollEntity;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.filter.*;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.springframework.stereotype.Controller;
import org.springframework.util.StopWatch;
import org.springframework.web.bind.annotation.*;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Job;
@Controller
@RequestMapping("/roll")
public class RollController extends BaseController {
private String prefix = "/roll";
/**
* 新增角色
*/
@GetMapping("/add")
public String add() {
// long count = rowCountByCoprocessor("mytb");
// System.out.println("总记录数->>>"+count + "");
return prefix + "/add";
}
@PostMapping("/list")
@ResponseBody
public TableDataInfo list(String inputEPC) {
// startPage();
// List<SysRole> list = roleService.selectRoleList(role);
//String epc = "E280117020000333BF040B34";
//String epc = "E280119120006618A51D032D"; //查询的EPC
String epc = inputEPC;
String tableName = "mytb";
String columnFamily = "mycf";
// create(tableName, columnFamily);
// insert(tableName,columnFamily);
long startTime = System.currentTimeMillis();
//E280119120006BEEA4E5032
String reVal = query(tableName, columnFamily, epc);
long endTime = System.currentTimeMillis();
System.out.println("卷号查询时间为:" + (endTime - startTime) + "ms");
RollEntity model = new RollEntity();
model.epc = epc;
model.rollName = reVal;
model.searchTime = (endTime - startTime) + "ms";
List<RollEntity> list = new ArrayList<>();
list.add(model);
return getDataTable(list);
}
// 创建表
public static void create(String tableName, String columnFamily) {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase");
conf.set("hbase.zookeeper.quorum", "localhost");
try {
Connection conn = ConnectionFactory.createConnection(conf);
if (conn.getAdmin().tableExists(TableName.valueOf(tableName))) {
System.err.println("Table exists!");
} else {
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
try {
tableDesc.addFamily(new HColumnDescriptor(columnFamily));
conn.getAdmin().createTable(tableDesc);
System.err.println("Create Table SUCCESS!");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// 插入数据
public static void insert(String tableName, String columnFamily) {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase");
conf.set("hbase.zookeeper.quorum", "localhost");
try {
Connection conn = ConnectionFactory.createConnection(conf);
TableName tn = TableName.valueOf(tableName);
Table table = conn.getTable(tn);
try {
// for (int i = 17742000; i <= 100000000; i++) {
// Put put = new Put(Bytes.toBytes("row" + i));
// put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("code"),
// Bytes.toBytes("E280119120006BEEA4E5032" + i));
// table.put(put);
// }
// Put put = new Put(Bytes.toBytes("E280119120006618A51D032D"));
// put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("code"),
// Bytes.toBytes("CQ-230308009"));
// table.put(put);
Put put = new Put(Bytes.toBytes("E280117020000333BF040B34"));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("code"),
Bytes.toBytes("CQ-230309002"));
table.put(put);
table.close();// 释放资源
System.err.println("record insert SUCCESS!");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// 查询
public static String query(String tableName, String columnFamily, String rowName) {
String reVal = "";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase");
conf.set("hbase.zookeeper.quorum", "localhost");
try {
Connection conn = ConnectionFactory.createConnection(conf);
TableName tn = TableName.valueOf(tableName);
Table table = conn.getTable(tn);
try {
Get get = new Get(rowName.getBytes());
Result r = table.get(get);
for (Cell cell : r.rawCells()) {
String family = new String(CellUtil.cloneFamily(cell));
String qualifier = new String(CellUtil.cloneQualifier(cell));
String value = new String(CellUtil.cloneValue(cell));
System.out.println("列:" + family + ":" + qualifier + " 值:" + value);
reVal = value;
break;
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
conn.close();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return reVal;
}
//过滤查询
public static void queryFilter(String tableName, String columnFamily, String rowName, String value) {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase");
conf.set("hbase.zookeeper.quorum", "localhost");
try {
Connection conn = ConnectionFactory.createConnection(conf);
TableName tn = TableName.valueOf(tableName);
Table table = conn.getTable(tn);
try {
Scan scan = new Scan();
Filter filter = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(value)));
scan.setFilter(filter);
ResultScanner rs = table.getScanner(scan);
for (Result res : rs) {
System.out.println(res);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//读取HDFS文件
private static void readHDFSFileContents() {
InputStream is = null;
OutputStream os = null;
BufferedInputStream bufferInput = null;
BufferedOutputStream bufferOutput = null;
try {
is = new URL("hdfs://127.0.0.1:9000/myHadoop/1.txt").openStream();
bufferInput = new BufferedInputStream(is);
// IOUtils.copyBytes(is, os, 4096,false);
byte[] contents = new byte[1024];
int bytesRead = 0;
String strFileContents = "";
while ((bytesRead = is.read(contents)) != -1) {
strFileContents += new String(contents, 0, bytesRead);
}
System.out.println(strFileContents);
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
// IOUtils.closeStream(is);
}
}
//创建HDFS目录
private static void createHDFSDirectory() {
// TODO Auto-generated method stub
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000");
FileSystem fs = FileSystem.get(conf);
boolean result = fs.mkdirs(new Path("/myHadoop"));
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
//查询Hbase有多少条记录
public long rowCountByCoprocessor(String tablename){
long count = 0;
try {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase");
conf.set("hbase.zookeeper.quorum", "localhost");
Connection connection = ConnectionFactory.createConnection(conf);
//提前创建connection和conf
Admin admin = connection.getAdmin();
//admin.enableTable(TableName.valueOf("mytb"));
TableName name=TableName.valueOf(tablename);
//先disable表,添加协处理器后再enable表
//admin.disableTable(name);
HTableDescriptor descriptor = new HTableDescriptor(name); //admin.getTableDescriptor(name);
//descriptor.setReadOnly(false);
String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
if (! descriptor.hasCoprocessor(coprocessorClass)) {
descriptor.addCoprocessor(coprocessorClass);
}
//admin.modifyTable(name, descriptor);
//admin.enableTable(name);
//计时
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Scan scan = new Scan();
AggregationClient aggregationClient = new AggregationClient(conf);
//System.out.println("RowCount: " + aggregationClient.rowCount(name, new LongColumnInterpreter(), scan));
count = aggregationClient.rowCount(name, new LongColumnInterpreter(), scan);
stopWatch.stop();
System.out.println("统计耗时:" +stopWatch.getTotalTimeMillis());
connection.close();
} catch (Throwable e) {
e.printStackTrace();
}
return count;
}
}