大数据-147 Apache Kudu 常用 Java API 增删改查
点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(正在更新…)
章节内容
上节我们完成了如下的内容:
- Apache Kudu 的 Dockerfile
- Dockerfile 详解
- Kudu 启动访问
新建工程
由于重复了太多次,这里直接跳过了。
导入依赖
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.4.0</version>
</dependency>
创建新表
- 必须指定表连接到的Master节点主机名
- 必须定义Schema
- 必须指定副本数量、分区策略、数量
编写代码
package icu.wzk.kudu;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import java.util.ArrayList;
import java.util.List;
public class KuduCreateTable {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);
KuduClient kuduClient = kuduClientBuilder.build();
String tableName = "student";
List<ColumnSchema> columnSchemas = new ArrayList<>();
ColumnSchema id = new ColumnSchema
.ColumnSchemaBuilder("id", Type.INT32)
.key(true)
.build();
columnSchemas.add(id);
ColumnSchema name = new ColumnSchema
.ColumnSchemaBuilder("name", Type.STRING)
.key(false)
.build();
columnSchemas.add(name);
Schema schema = new Schema(columnSchemas);
CreateTableOptions options = new CreateTableOptions();
// 副本数量为1
options.setNumReplicas(1);
List<String> colrule = new ArrayList<>();
colrule.add("id");
options.addHashPartitions(colrule, 3);
kuduClient.createTable(tableName, schema, options);
kuduClient.close();
}
}
测试运行
控制台未输出内容
运行结果如下图所示:
查看Kudu
我们查看Kudu的Tables,可以看到刚才创建的表如下:
删除表
编写代码
package icu.wzk.kudu;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
public class KuduDeleteTable {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251,";
KuduClient client = new KuduClient.KuduClientBuilder(masterAddress)
.defaultAdminOperationTimeoutMs(5000)
.build();
client.deleteTable("student");
client.close();
}
}
测试运行
控制台没有输出内容,这里运行截图如下:
查看Kudu
查看Kudu服务的 Table 页,里边的数据表已经删除了。
插入数据
- 获取客户端
- 打开一张表
- 创建会话
- 设置刷新模式
- 获取插入实例
- 声明带插入的数据
- 刷入数据
- 应用插入实例
- 关闭会话
创建新表
我们运行刚才的创建新表代码,把student表先生成出来,具体运行这里跳过了。
编写代码
package icu.wzk.kudu;
import org.apache.kudu.client.*;
public class KuduInsert {
public static void main(String[] args) throws KuduException {
String masterAddr = "localhost:7051,localhost:7151,localhost:7251";
KuduClient client = new KuduClient
.KuduClientBuilder(masterAddr)
.defaultAdminOperationTimeoutMs(5000)
.build();
KuduTable stuTable = client.openTable("student");
KuduSession kuduSession = client.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
Insert insert = stuTable.newInsert();
insert.getRow().addInt("id", 1);
insert.getRow().addString("name", "wzk");
kuduSession.flush();
kuduSession.apply(insert);
kuduSession.close();
client.close();
}
}
在代码中,有一个叫:kuduSession.setFlushMode:
- AUTO_FLUSH_SYNC(默认):意思是调用KuduSession apply方法后,客户端会在当前刷新到服务器后再返回,这种情况不能够批量插入数据,调用 flush 方法不会起作用,应为此时缓冲区已经被刷新到了服务器。
- AUTO_FLUSH_BACKGROUD:意思是调用apply方法后,客户端会立即返回,但是写入将在后台发送,可能与来自同一会话的其他写入一起进行批处理。如果没有足够的缓冲空间,KuduSession apply会阻塞,缓冲空间不可用。因为写入操作是在后台进行的,因此任何一个错误都将存储在一个会话本地缓冲区中。注意:这个模式可能会导致插入是乱序的,这是因为在这种模式下,多个写操作可以并发的发送到服务器。且这是一个Kudu的BUG,详细请看:https://issues.apache.org/jira/browse/KUDU-1767https://issues.apache.org/jira/browse/KUDU-1767
- MANUAL_FLUSH:调用apply后,会非常快的返回,但是写操作不会发送,直到用户使用flush函数,如果缓冲区超过了限制大小,apply就会返回一个错误。
测试运行
控制台无输出内容,运行的截图如下图所示:
查询数据
编写代码
Kudu的查询数据用Scanner
package icu.wzk.kudu;
import org.apache.kudu.client.*;
public class KuduSelect {
public static void main(String[] args) throws KuduException {
String masterAddr = "localhost:7051,localhost:7151,localhost:7251";
KuduClient client = new KuduClient
.KuduClientBuilder(masterAddr)
.build();
KuduTable kuduTable = client.openTable("user");
KuduScanner kuduScanner = client.newScannerBuilder(kuduTable).build();
while (kuduScanner.hasMoreRows()) {
for (RowResult result : kuduScanner.nextRows()) {
int id = result.getInt("id");
String name = result.getString("name");
int age = result.getInt("age");
System.out.println("id: " + id + ", name: " + name + ", age: " + age);
}
}
client.close();
}
}
测试运行
id: 1, name: wzk
Process finished with exit code 0
运行结果如下图所示:
更改数据
编写代码
package icu.wzk.kudu;
import org.apache.kudu.client.*;
public class KuduUpdate {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
KuduClient client = new KuduClient
.KuduClientBuilder(masterAddress)
.build();
KuduTable stuTable = client.openTable("student");
KuduSession kuduSession = client.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
Update update = stuTable.newUpdate();
update.getRow().addInt("id", 1);
update.getRow().addString("name", "wzk_icu");
kuduSession.apply(update);
kuduSession.close();
client.close();
}
}
删除指定行
编写代码
package icu.wzk.kudu;
import org.apache.kudu.client.*;
public class KuduDelete {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
KuduClient client = new KuduClient
.KuduClientBuilder(masterAddress)
.build();
KuduSession kuduSession = client.newSession();
KuduTable stuTable = client.openTable("student");
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
Delete delete = stuTable.newDelete();
PartialRow row = delete.getRow();
row.addInt("id", 1);
kuduSession.flush();
kuduSession.apply(delete);
kuduSession.close();
client.close();
}
}
测试运行
控制台没有输出任何内容,运行过程截图如下: