Kafka详解 ③ | Kafka集群操作与API操作
目录
1、Kafka集群操作
1.1、创建 topic
1.2、查看主题命令
1.3、生产者生产
1.4、消费者消费数据
1.5、运行 describe topics命令
1.6、增加 topic分区数
1.7、增加配置
1.8、删除配置
1.9、删除 topic
2、Kafka的Java API操作
2.1、生产者代码
2.2、消费者代
2.2.1、自动提交 offset
2.2.2、手动提交 offset
2.2.3、消费完每个分区之后手动提交 offset
2.2.4、指定分区数据进行消费
2.2.5、重复消费与数据丢失
2.2.6 consumer消费者消费数据流程
2.3、kafka Streams API开发
C++软件异常排查从入门到精通系列教程(核心精品专栏,订阅量已达600多个,欢迎订阅,持续更新...)https://blog.csdn.net/chenlycly/article/details/125529931C/C++实战专栏(重点专栏,专栏文章已更新480多篇,订阅量已达数百个,欢迎订阅,持续更新中...)https://blog.csdn.net/chenlycly/article/details/140824370C++ 软件开发从入门到实战(重点专栏,专栏文章已更新280多篇,欢迎订阅,持续更新中...)https://blog.csdn.net/chenlycly/category_12695902.htmlVC++常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...)https://blog.csdn.net/chenlycly/article/details/124272585C++软件分析工具从入门到精通案例集锦(专栏文章,持续更新中...)https://blog.csdn.net/chenlycly/article/details/131405795开源组件及数据库技术(专栏文章,持续更新中...)https://blog.csdn.net/chenlycly/category_12458859.html网络编程与网络问题分享(专栏文章,持续更新中...)https://blog.csdn.net/chenlycly/category_2276111.html
1、Kafka集群操作
1.1、创建 topic
创建一个名字为 test的主题:
# 三个分区,两个副本
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2
--partitions 3 --topic test
1.2、查看主题命令
查看 kafka当中存在的主题:
bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
1.3、生产者生产
模拟生产者来生产数据:
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
1.4、消费者消费数据
模拟消费者进行消费数据:
bin/kafka-console-consumer.sh --from-beginning --topic test --zookeeper node01:2181,node02:2181,node03:2181
1.5、运行 describe topics命令
查看 topic的相关信息:
# 执⾏以下命令运⾏describe查看topic的相关信息
bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test
结果说明:
这是输出的解释。第一行给出了所有分区的摘要, 每个附加行提供有关一个分区的信息。由于我们只有一个分 区用于此主题, 因此只有一行。
- “leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。 (因为在 kafka中 如果有多个副本的话, 就会存在 leader和 follower的关系, 表示当前这个副本为 leader所在的 broker是哪一个)
- “replicas”是复制此分区日志的节点列表, 无论它们是否为领导者,或者即使它们当前处于活动状态。 (所有副本列表0,1,2)
- “isr”是“同步”复制品的集合。这是副本列表的子集, 该列表当前处于活跃状态并且已经被领导者捕获。 (可用的列表数)
1.6、增加 topic分区数
增加 topic分区数:
bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8
1.7、增加配置
动态修改 kakfa的配置:
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1
1.8、删除配置
动态删除 kafka集群配置:
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages
1.9、删除 topic
目前删除 topic在默认情况下知识打上一个删除的标记, 在重新启动 kafka后才删除。如果需要立即删除,则需要在 server. properties中配置:delete. topic. enable= true,然后执行以下命令进行删除 topic:
kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName
在这里,给大家重点推荐一下我的几个热门畅销专栏,欢迎订阅:(博客主页还有其他专栏,可以去查看)
专栏1:(该精品技术专栏的订阅量已达到580多个,专栏中包含大量项目实战分析案例,有很强的实战参考价值,广受好评!专栏文章已经更新到200篇以上,持续更新中!欢迎订阅!)
C++软件调试与异常排查从入门到精通系列文章汇总(专栏文章,持续更新中...)https://blog.csdn.net/chenlycly/article/details/125529931
本专栏根据多年C++软件异常排查的项目实践,系统地总结了引发C++软件异常的常见原因以及排查C++软件异常的常用思路与方法,详细讲述了C++软件的调试方法与手段,以图文并茂的方式给出具体的项目问题实战分析实例(很有实战参考价值),带领大家逐步掌握C++软件调试与异常排查的相关技术,适合基础进阶和想做技术提升的相关C++开发人员!
考察一个开发人员的水平,一是看其编码及设计能力,二是要看其软件调试能力!所以软件调试能力(排查软件异常的能力)很重要,必须重视起来!能解决一般人解决不了的问题,既能提升个人能力及价值,也能体现对团队及公司的贡献!
专栏中的文章都是通过项目实战总结出来的,包含大量项目问题实战分析案例,有很强的实战参考价值!专栏文章还在持续更新中,预计文章篇数能更新到200篇以上!
专栏2:(本专栏涵盖了C++多方面的内容,是当前重点打造的专栏,订阅量已达220多个,专栏文章已经更新到480多篇,持续更新中!欢迎订阅!)
C/C++实战进阶(专栏文章,持续更新中...)https://blog.csdn.net/chenlycly/category_11931267.html
以多年的开发实战为基础,总结并讲解一些的C/C++基础与项目实战进阶内容,以图文并茂的方式对相关知识点进行详细地展开与阐述!专栏涉及了C/C++领域多个方面的内容,包括C++基础及编程要点(模版泛型编程、STL容器及算法函数的使用等)、数据结构与算法、C++11及以上新特性(不仅看开源代码会用到,日常编码中也会用到部分新特性,面试时也会涉及到)、常用C++开源库的介绍与使用、代码分享(调用系统API、使用开源库)、常用编程技术(动态库、多线程、多进程、数据库及网络编程等)、软件UI编程(Win32/duilib/QT/MFC)、C++软件调试技术(排查软件异常的手段与方法、分析C++软件异常的基础知识、常用软件分析工具使用、实战问题分析案例等)、设计模式、网络基础知识与网络问题分析进阶内容等。
专栏3:
C++常用软件分析工具从入门到精通案例集锦汇总(专栏文章,持续更新中...)https://blog.csdn.net/chenlycly/article/details/131405795
常用的C++软件辅助分析工具有SPY++、PE工具、Dependency Walker、GDIView、Process Explorer、Process Monitor、API Monitor、Clumsy、Windbg、IDA Pro等,本专栏详细介绍如何使用这些工具去巧妙地分析和解决日常工作中遇到的问题,很有实战参考价值!
专栏4:
VC++常用功能开发汇总(专栏文章,持续更新中...)https://blog.csdn.net/chenlycly/article/details/124272585
将10多年C++开发实践中常用的功能,以高质量的代码展现出来。这些常用的高质量规范代码,可以直接拿到项目中使用,能有效地解决软件开发过程中遇到的问题。
专栏5: (本专栏涵盖了C++多方面的内容,是当前重点打造的专栏,专栏文章已经更新到280多篇,持续更新中!欢迎订阅!)
C++ 软件开发从入门到实战(专栏文章,持续更新中...)https://blog.csdn.net/chenlycly/category_12695902.html
根据多年C++软件开发实践,详细地总结了C/C++软件开发相关技术实现细节,分享了大量的实战案例,很有实战参考价值。
2、Kafka的Java API操作
2.1、生产者代码
使用生产者, 生产数据:
/**
* 订单的⽣产者代码,
*/
public class OrderProducer {
public static void main(String[] args) throws InterruptedException {
/* 1、连接集群,通过配置⽂件的⽅式
* 2、发送数据-topic:order,value
*/
Properties props = new Properties(); props.put("bootstrap.servers"
, "node01:9092"); props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 1000; i++) {
// 发送数据 ,需要⼀个producerRecord对象,最少参数 String topic, Value
kafkaProducer.send(new ProducerRecord<String, String>("order", "订单信息!"+i));
Thread.sleep(100);
}
}
}
kafka当中的数据分区:
kafka生产者发送的消息,都是保存在 broker当中,我们可以自定义分区规则,决定消息发送到哪个 partition里面去进行保存查看ProducerRecord这个类的源码,就可以看到 kafka的各种不同分区策略 kafka当中支持以下四种数据的分区方式:
//第⼀种分区策略,如果既没有指定分区号,也没有指定数据key,那么就会使⽤轮询的⽅式将数据
均匀的发送到不同的分区⾥⾯去
//ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("mypartition", "mymessage" + i);
//kafkaProducer.send(producerRecord1);
//第⼆种分区策略 如果没有指定分区号,指定了数据key,通过key.hashCode % numPartitions来计算数据究竟会保存在哪⼀个分区⾥⾯
//注意:如果数据key,没有变化 key.hashCode % numPartitions = 固定值 所有的数据都会写⼊到某⼀个分区⾥⾯去
//ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("mypartition", "mykey", "mymessage" + i);
//kafkaProducer.send(producerRecord2);
//第三种分区策略:如果指定了分区号,那么就会将数据直接写⼊到对应的分区⾥⾯去
// ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>
//("mypartition", 0, "mykey", "mymessage" + i);
// kafkaProducer.send(producerRecord3);
//第四种分区策略:⾃定义分区策略。如果不⾃定义分区规则,那么会将数据使⽤轮询的⽅式均匀的
//发送到各个分区⾥⾯去
kafkaProducer.send(new ProducerRecord<String, String>("mypartition","mymes
sage"+i));
public class KafkaCustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object arg1, byte[] keyBytes, Objec
t arg3, byte[] arg4, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic)
;
int partitionNum = partitions.size();
Random random = new Random();
int partition = random.nextInt(partitionNum);
return partition;
}
@Override
public void close() {
}
}
2.2、消费者代
消费必要条件:
消费者要从 kafka Cluster进行消费数据, 必要条件有以下四个:
- 地址:bootstrap. servers=node01:9092。
- 序列化:key.serializer= org.apache.kafka.common.serialization.StringSerializer value.serializer= org.apache.kafka.common.serialization.StringSerializer。
- 主题 ( topic) :需要制定具体的某个 topic ( order) 即可。
- 消费者组:group. id= test。
2.2.1、自动提交 offset
消费完成之后, 自动提交:
/**
* 消费订单数据--- javaben.tojson
*/
public class OrderConsumer {
public static void main(String[] args) {
// 1\连接集群
Properties props = new Properties(); props.put("bootstrap.servers"
, "hadoop-01:9092"); props.put("group.id", "test");
//以下两⾏代码 ---消费者⾃动提交offset值
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serializati
on.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serializa
tion.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<St
ring, String>
(props);
// 2、发送数据 发送数据需要,订阅下要消费的topic。 order kafkaConsume
r.subscribe(Arrays.asList("order"));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsume
r.poll(100);// jdk queue offer插⼊、poll获取元素。 blockingqueue put插⼊原
⽣, take获取元素
for (ConsumerRecord<String, String> record : consumerRecords)
{ System.out.println("消费的数据为:" + record.value());
}
}
}
}
2.2.2、手动提交 offset
如果 Consumer在获取数据后,需要加入处理,数据完毕后才确认offset,需要程序来控制 offset的确认。
关闭自动提交确认选项:props.put(" enable.auto.commit", " false");
手动提交 offset值:kafkaConsumer.commitSync();
完整代码如下:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
//关闭⾃动提交确认选项
props.put("enable.auto.commit", "false");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consu
mer.subscribe(Arrays.asList("test"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
// ⼿动提交offset值
consumer.commitSync();
buffer.clear();
}
}
2.2.3、消费完每个分区之后手动提交 offset
上面的示例使用commitSync将所有已接收的记录标记为已提交。在某些情况下,可能希望通过明确指定偏移量来更好地控制已提交的记录。在下面的示例中,我们在完成处理每个分区中的记录后提交偏移量:
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_V
ALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = record
s.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords)
{ System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size()
-1).offset();
consumer.commitSync(Collections.singletonMap(partition, new Of
fsetAndMetadata(lastOffset + 1)));
}
}
} finally { consumer.close();}
注意事项:
提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。因此,在调用commitSync (偏移量)时,应该在最后处理的消息的偏移量中添加一个。
2.2.4、指定分区数据进行消费
如果进程正在维护与该分区关联的某种本地状态 (如本地磁盘上的键值存储) ,那么它应该只获取它在磁盘上维护的分区的记录。
如果进程本身具有高可用性,并且如果失败则将重新启动 (可能使用YARN, Mesos或AWS工具等集群管理框架,或作为流处理框架的一部分) 。在这种情况下,Kafka不需要检测故障并重新分配分区,因为消耗过程将在另一台机器上重新启动。
Properties props = new Properties(); props.put("bootstrap.servers", "local
host:9092"); props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//consumer.subscribe(Arrays.asList("foo", "bar"));
//⼿动指定消费指定分区的数据---start
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(
Arrays.asList(partition0, partition1));
//⼿动指定消费指定分区的数据---end
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.of
fset(), record.key(), record.value());
}
注意事项:
- 要使用此模式,只需使用要使用的分区的完整列表调用 assign ( Collection) ,而不是使用 subscribe订阅主题。
- 主题与分区订阅只能二选一。
2.2.5、重复消费与数据丢失
说明:
- 1)已经消费的数据对于 kafka来说,会将消费组里面的 offset值进行修改, 那什么时候进行修改了?是在数据消费完成之后, 比如在控制台打印完后自动提交。
- 2)提交过程: 是通过 kafka将 offset进行移动到下个 message所处的 offset的位置。
- 3)拿到数据后,存储到 hbase中或者 mysql中,如果 hbase或者 mysql在这个时候连接不上,就会抛出异常。如果在处理数据的时候已经进行了提交, 那么 kafka伤的offset值已经进行了修改了,但是 hbase或者 mysql中没有数据,这个时候就会出现数据丢失。
- 4)什么时候提交 offset值?在 Consumer将数据处理完成之后,再来进行 offset的修改提交。默认情况下 offset是 自动提交,需要修改为手动提交 offset值。
- 5)如果在处理代码中正常处理了,但是在提交 offset请求的时候,没有连接到 kafka或者出现了故障,那么该次修 改 offset的请求是失败的,那么下次在进行读取同一个分区中的数据时,会从已经处理掉的 offset值再进行处理一次,那么在 hbase中或者 mysql中就会产生两条一样的数据,也就是数据重复。
2.2.6 consumer消费者消费数据流程
流程描述:
Consumer连接指定的 Topic partition所在 leader broker,采用 pull方式从 kafkalogs中获取消息。对于不同的消费模式,会将 offset保存在不同的地方官网关于 high level API 以及 low level API的简介:http://kafka.apache.org/0100/documentation.html#impl_consumer
高阶API ( High Level API) :
kafka消费者高阶API简单;隐藏 Consumer与 Broker细节;相关信息保存在 zookeeper中:
/* create a connection to the cluster */
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
/**
This method is used to get a list of KafkaStreams, which are itera
tors over
MessageAndMetadata objects from which you can obtain messages and
their
associated metadata (currently only topic).
Input: a map of <topic, #streams>
Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,I
nt> topicCountMap);
/**
You can also obtain a list of KafkaStreams, that iterate over mess
ages
from topics that match a TopicFilter. (A TopicFilter encapsulates
a
whitelist or a blacklist which is a standard Java regex.)
*/
public List<KafkaStream> createMessageStreamsByFilter( TopicFilter top
icFilter, int numStreams);
/*
Commit the offsets of all messages consumed so far.
*/
public commitOffsets()
/*
Shut down the connector
*/
public shutdown()
}
说明:大部分的操作都已经封装好了,比如当前消费到哪个位置下了,但是不够灵活 (工作过程推荐使用)
低级API( Low Level API):
kafka消费者低级API非常灵活;需要自己负责维护连接 Controller Broker。保存 offset,Consumer Partition对应关系:
class SimpleConsumer {
/* Send fetch request to a broker and get back a set of messages. */
public ByteBufferMessageSet fetch(FetchRequest request);
/* Send a list of fetch requests to a broker and get back a response set.
*/
public MultiFetchResponse multifetch(List<FetchRequest> fetches);
/**
Get a list of valid offsets (up to maxSize) before the given time.
The result is a list of offsets, in descending order.
@param time: time in millisecs,
if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest
offset available. if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get fr
om the earliest
available. public long[] getOffsetsBefore(String topic, int partition, lon
g time, int maxNumOffsets);
* offset
*/
说明:没有进行包装,所有的操作有用户决定,如自己的保存某一个分区下的记录,你当前消费到哪个位置。
2.3、kafka Streams API开发
需求:使用StreamAPI获取 test这个 topic当中的数据,然后将数据全部转为大写,写入到test2这个 topic 当中去。
第一步:创建一个 topic
node01服务器使用以下命令来常见一个 topic 名称为test2:
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
第二步:开发StreamAPI
public class StreamAPI {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-applicat
ion");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().ge
tClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().
getClass());
KStreamBuilder builder = new KStreamBuilder();
builder.stream("test").mapValues(line -> line.toString().toUpperCa
se()).to("test2");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}
执行上述代码,监听获取 test 中的数据,然后转成大写,将结果写入test2。
第三步: 生产数据
node01执行以下命令,向 test这个 topic当中生产数据:
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:
9092 --topic test
第四步: 消费数据
node02执行一下命令消费test2这个 topic当中的数据:
bin/kafka-console-consumer.sh --from-beginning --topic test2 --zookeeper n
ode01:2181,node02:2181,node03:2181