kafka生产者专题(原理+拦截器+序列化+分区+数据可靠+数据去重+事务)
目录
- 生产者
- 发送数据原理
- 参数说明
- 代码示例(同步发送数据)
- 代码示例(异步)
- 异步和同步的区别
- 同步发送
- 定义与流程
- 特点
- 异步发送
- 定义与流程
- 特点
- 异步回调
- 描述
- 代码示例
- 拦截器
- 描述
- 代码示例
- 消息序列化
- 描述
- 代码示例(自定义序列化)
- 分区
- 描述
- 分区策略
- 代码示例
- 写入默认分区(0号分区)
- 自定义分区机制
- 消息丢失
- 消息绝对不丢失的条件
- 数据去重
- 描述
- 幂等性
- 事务
- 代码示例(事务)
生产者
发送数据原理
说明
- 拦截器允许有多个,可以组成拦截器链
- 生产者发送的消息会被分配到不同的分区(Partition)。每个分区在内存中都有一个对应的缓冲区(RecordAccumulator),用于暂存即将发送的消息。sender线程从中读取数据
- sender线程两个重要参数(大小默认16KB,当数据量达到16KB时读取,读取时间默认0ms,当达到读取时间时,自动读取数据,不管大小有没有达到),这两个参数均可以调整
- NetworkClient负责将生产者的请求(如发送消息、获取元数据等)发送到相应的broker,并存储这些请求的响应;NetworkClient还负责处理网络连接的建立、维护和关闭。
- 当生产者发送消息到broker时,可以选择不同的应答级别(acks参数):
acks=0:生产者不等待broker的应答,直接认为消息发送成功。这种方式性能最高,但可靠性最低。
acks=1:生产者等待leader broker的应答,只要leader broker确认收到消息,就认为消息发送成功。这种方式性能较高,但可靠性略低。
acks=all(或acks=-1):leader和follower都落地回应,才认为消息发送成功。这种方式性能最低,但可靠性最高。
Broker在收到消息后,会根据配置的应答机制向生产者发送应答(或错误)信息。 - 请注意关于broker的落地是指数据存储到磁盘或持久化
- 数据发送成功后(接收到应答)删除缓冲区内对应的数据
参数说明
参数 | 默认值 | 作用描述 |
---|---|---|
bootstrap.servers | node2:9092[,node3:9092][,node4:9092] | 生产者连接集群所需的broker地址清单,一个或多个(逗号隔开) |
key.serializer | (无) | 指定发送消息的key的序列化类型,必须写全类名 |
value.serializer | (无) | 指定发送消息的value的序列化类型,必须写全类名 |
buffer.memory | 32M | RecordAccumulator缓冲区总大小 |
batch.size | 16K | 缓冲区一批数据最大值,适当增加可提高吞吐量,但可能增加延迟 |
linger.ms | 0ms (表示没有延迟) | 如果数据未达到batch.size,sender等待linger.time后发送数据 |
acks | -1 | 应答机制:0-不需要应答,1-Leader应答,-1(all)-所有节点应答 |
max.in.flight.requests.per.connection | 5 | 允许最多没有返回ack的次数,开启幂等性时建议1-5之间 |
enable.idempotence | true | 是否开启幂等性,默认开启 |
retries | 2147483647 (int最大值) | 消息发送错误时的重试次数 |
retry.backoff.ms | 100ms | 两次重试之间的时间间隔 |
compression.type | none | 生产者发送的所有数据的压缩方式,默认不压缩 |
代码示例(同步发送数据)
package com.wunaiieq;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SyncCustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//TODO 1.声明并实例化Kafka Producer的配置文件对象
Properties prop = new Properties();
//TODO 2.为配置文件对象设置参数
//TODO 2.1 配置bootstrap_server(生产者连接集群所需的broker地址清单)
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
//TODO 2.2 配置key和value的序列化类
// 设置序列化器:指定key和value的序列化类为StringSerializer,用于将字符串类型的key和value转换为字节数组,以便发送到Kafka。
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//TODO 3.声明并实例化生产者对象
KafkaProducer<String,String> producer =
new KafkaProducer<String, String>(prop);
//TODO 4.发送消息
// producer.send(...).get():同步发送消息,send()方法返回一个Future对象,调用get()方法等待发送完成并获取结果。
for(int i = 0;i<5;i++){
//同步发送消息
producer.send(new ProducerRecord<>("topicA","sync_msg"+i)).get();
}
//TODO 5.关闭生产者
producer.close();
}
}
代码说明
类/对象 | 描述 | 用途 |
---|---|---|
Properties | Java标准库中的类,用于维护键值对列表。 Properties类提供了一种方便的方式来读取和写入属性文件(通常是.properties文件) | 在本代码中用于存储Kafka生产者的配置参数。 |
KafkaProducer<K, V> | Kafka客户端库中的类,用于向Kafka主题发送消息。泛型参数K 和V 分别表示消息键和值的类型。 | 创建生产者实例,发送消息到Kafka主题。 |
ProducerConfig | Kafka客户端库中的类,包含生产者配置的常量。 | 提供配置参数的常量值,如broker地址、序列化器等。 |
ProducerRecord<K, V> | Kafka客户端库中的类,表示要发送到Kafka主题的消息记录。泛型参数K 和V 分别表示消息键和值的类型。 | 创建消息记录对象,包含主题、键和值。 |
StringSerializer | Kafka客户端库中的类,实现了Serializer<String> 接口,用于将字符串类型的键或值序列化为字节数组。 | 作为键和值的序列化器,将字符串转换为字节数组进行传输。 |
效果
代码示例(异步)
package com.wunaiieq;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class UnSyncCustomProducer {
public static void main(String[] args) {
//实例化Properties
Properties prop = new Properties();
//集群节点
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
//key和value
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//创建kafka生产者对象,并写入响应参数
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//发送数据
for (int i = 0; i < 5; i++) {
//异步发送数据,不调用get方法
producer.send(new ProducerRecord<>(
"topicA", "unsync_msg" + i
));
}
producer.close();
}
}
异步和同步的区别
同步发送
定义与流程
定义: 同步发送是指生产者在发送一条消息后,会立即等待Kafka服务器的响应。只有在服务器返回成功响应后,生产者才会继续发送下一条消息。
流程:
生产者调用send()方法发送消息。
send()方法返回一个Future对象。
生产者调用Future对象的get()方法,该方法会阻塞当前线程,直到Kafka服务器返回响应或抛出异常。
生产者收到响应后,根据结果(成功或失败)进行后续操作。
特点
高可靠性:同步发送确保每条消息都被Kafka集群接收并持久化。生产者等待Kafka确认消息已经成功写入指定分区且复制到满足副本因子的节点上,从而提高了消息的可靠性。
异常处理:如果发送过程中发生异常,生产者可以立即感知并处理,避免了消息的丢失。
性能较低:由于同步发送需要阻塞等待响应,因此会增加消息的延迟,降低系统的吞吐量。特别是在高并发场景下,可能会导致线程资源的大量占用和性能瓶颈。
易调试:便于发现和处理异常,有利于开发和测试阶段的调试工作。
异步发送
定义与流程
定义: 异步发送是指生产者在发送一条消息后,不会立即等待Kafka服务器的响应,而是继续发送下一条消息。发送方通过传递一个回调函数给send()方法,该回调函数将在消息发送结果(成功或失败)可用时被异步调用。
流程:
生产者调用send()方法发送消息,并传递一个回调函数。
Kafka客户端将消息放入内部缓冲区,并立即返回。
Sender线程负责将缓冲区中的消息批量发送到Kafka集群。
当消息发送成功或失败时,Kafka客户端调用之前传递的回调函数,通知生产者消息发送的结果。
特点
高性能:异步发送方式下,生产者无需等待每个消息的确认即可继续发送下一条消息,从而提高了消息的发送效率,适用于高吞吐量场景。
灵活性:通过回调函数,生产者可以对消息发送的结果进行异步处理,如记录日志、重试发送等。
可靠性相对较低:由于生产者不会立即得知消息是否成功写入Kafka,因此消息的可靠性需要额外关注。如果生产者在发送消息后立即崩溃,可能会导致部分消息丢失。
调试复杂:由于消息发送和结果是异步的,因此调试时可能需要更多的日志记录和监控手段来确保消息的可靠性和完整性。
异步回调
描述
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。消息发送失败后会自动重试,不需要再回调函数中手动重试。
代码示例
package com.wunaiieq;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class UnSyncCallBackCustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//TODO 1.声明并实例化Kafka Producer的配置文件对象
Properties prop = new Properties();
//TODO 2.为配置文件对象设置参数
// 2.1 配置bootstrap_servers
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
// 2.2 配置key和value的序列化类
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//TODO 3.声明并实例化生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
//TODO 4.发送消息
for(int i = 0;i<5;i++){
//异步发送消息 不调用get()方法
producer.send(new ProducerRecord<>("topicA", "unsync_msg" + i),
new Callback() {
//如下方法在生产者收到acks确认时异步调用
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
//无异常信息,笑死发送成功,输出主题和分区信息到控制台
System.out.println("topic:"+recordMetadata.topic()
+",partition:"+recordMetadata.partition());
}else{//打印异常信息
System.out.println(e.getMessage());
}
}
});
Thread.sleep(5);
}
//TODO 5.关闭生产者
producer.close();
}
}
拦截器
描述
拦截器(Interceptor)是kafka0.10.0.0版本中引入的新功能,主要用于实现clients端的定制化控制逻辑。它可以使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时允许指定多个Interceptor按序作用于同一条消息从而形成一个拦截器链(Interceptor Chain)。
自定义拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。
拦截器内部方法
-
onSend
方法- 作用:在消息发送之前进行拦截,允许对消息进行修改或处理。
- 参数:接收一个
ProducerRecord
对象。 - 返回值:返回一个
ProducerRecord
对象,可能是修改后的记录。 - 应用场景:添加消息头、修改消息内容、过滤消息等。
-
onAcknowledgement
方法- 作用:在消息发送成功或失败后进行回调。
- 参数:
RecordMetadata
:包含消息的元数据。Exception
:发送过程中可能抛出的异常,成功时为null
。
- 返回值:无。
- 应用场景:记录发送结果、统计发送成功率、处理发送失败等。
-
close
方法- 作用:在拦截器不再使用时进行资源清理。
- 参数:无。
- 返回值:无。
- 应用场景:关闭打开的文件、释放内存、断开网络连接等。
拦截器Interceptor可能运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外,若指定了多个Interceptor,则producer将按照指定顺序调用它们,同时把每个Interceptor中捕获的异常记录写入到错误日志中而不是向上传递。这在使用过程中要特别留意。
代码示例
实现一个简单的双interceptor组成的拦截链。
第一个Interceptor会在消息发送前将时间戳信息加到消息value的前面
第二个Interceptor在消息发送后更新成功发送消息数或失败发送消息数。
第一个拦截器示例
package com.wunaiieq;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimeStampInterceptor implements ProducerInterceptor<String,String> {
/**初始化拦截器,并接收Kafka生产者的配置参数。
* */
@Override
public void configure(Map<String, ?> configs) {
}
/**发送之前被调用,对消息进行处理。
* */
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<String, String>(
//原始消息记录的主题、分区、时间戳和键。
record.topic(),record.partition(),
record.timestamp(), record.key(),
//将当前系统时间戳(System.currentTimeMillis())和原始消息值拼接成新的消息值,中间用逗号分隔。
System.currentTimeMillis()+","+record.value());
}
/**消息发送成功或失败后被调用。
* */
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
/**在拦截器不再使用时进行资源清理。
* */
@Override
public void close() {
}
}
第二个拦截器示例
package com.wunaiieq;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CounterIntercepter implements ProducerInterceptor<String,String> {
private int errorCounter = 0;
private int successCounter = 0;
/**onSend方法,该方法在消息发送之前被调用,用于对消息进行处理。
* 由于这是第二个拦截器,因此这里接受的是前一个拦截器的输出
* */
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
/**在消息发送成功或失败后被调用。
* 统计消息发送成功或失败的数量
* */
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if(exception==null){
successCounter++;
}else{
errorCounter++;
}
}
/**拦截器关闭时会进行的额外操作
* 打印成功或失败的消息数量
* */
@Override
public void close() {
System.out.println("successful send:"+successCounter);
System.out.println("failed send:"+errorCounter);
}
@Override
public void configure(Map<String, ?> configs) {
}
}
拦截器调用
package com.wunaiieq;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SyncCustomProducerInterceptor {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 构造拦截器链
List<String> interceptors = new ArrayList<>();
interceptors.add("com.wunaiieq.TimeStampInterceptor");
interceptors.add("com.wunaiieq.CounterIntercepter");
//配置拦截器链(将拦截器链加入到配置文件中)
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
KafkaProducer<String, String> producer =
new KafkaProducer<String, String>(prop);
for (int i = 5; i < 10; i++) {
//同步发送消息
producer.send(new ProducerRecord<>("topicA", "sync_msg" + i)).get();
}
//一定要关闭生产者,这样才会调用interceptor的close方法
producer.close();
}
}
效果
消息序列化
描述
消息序列化是将对象转换为字节流的过程。在Kafka中,生产者需要将消息对象序列化为字节流,以便通过网络发送给Kafka集群;而消费者则需要将从Kafka集群接收到的字节流反序列化为对象,以便进行后续处理。
代码示例(自定义序列化)
pom.xml
增加依赖
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
UserVo.java
值对象
package com.wunaiieq;
public class UserVo {
private String name;
private int age;
private String address;
public UserVo(String name, int age, String address) {
this.name = name;
this.age = age;
this.address = address;
}
@Override
public String toString() {
return "UserVo{" +
"name='" + name + '\'' +
", age=" + age +
", address='" + address + '\'' +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
}
UserSerializer.java
重写Serializer接口实现序列化操作
package com.wunaiieq;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
public class UserSerializer implements Serializer<UserVo> {
private ObjectMapper objectMapper;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
objectMapper = new ObjectMapper();
//Serializer.super.configure(configs, isKey);
}
/**
* @param topic 消息将要发送到的主题名
* @param data 需要序列化的UserVo对象。
* */
@Override
public byte[] serialize(String topic, UserVo data) {
//存储序列化后的字节数组
byte[] ret = null;
try {
//data写成JSON字符串再写成UTF_8的字节数组
ret = objectMapper.writeValueAsString(data)
.getBytes(StandardCharsets.UTF_8);
} catch (IOException e) {
throw new SerializationException("Error when serializing UserVo to byte[],exception is " + e.getMessage());
}
return ret;
}
@Override
public void close() {
objectMapper = null;
//Serializer.super.close();
}
}
UserSerProducer.java
调用自定义序列化机制
package com.wunaiieq;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class UserSerProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
// TODO 不用修改key的序列化机制,后续没用到
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// TODO 修改value的序列化机制
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());
KafkaProducer<String,UserVo> producer = new KafkaProducer<String, UserVo>(prop);
UserVo userVo = new UserVo("wunaiieq",18,"北京");
producer.send(
// TODO 关于消息记录的构造中,可以指定 1.主题、值 2.主题、键、值
new ProducerRecord<String,UserVo>("topicA", userVo),
new Callback() {
//如下方法在生产者收到acks确认时异步调用
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
//无异常信息,输出主题和分区信息到控制台
System.out.println("topic:"+recordMetadata.topic()
+",partition:"+recordMetadata.partition());
}else{//打印异常信息
System.out.println(e.getMessage());
}
}
});
Thread.sleep(50);
producer.close();
}
}
效果
前面的不用管,只是没清空而已
分区
描述
分区位于拦截器链后面
生产者分区的优势:
-
便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
-
提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
-
分区后,更方便于做副本备份,提高了数据安全性。
分区策略
以下为提供的默认分区策略,自行选择即可
- 轮询策略(Round-Robin Strategy)
原理:按照顺序将消息发送到不同的分区,每个消息被发送到其对应分区,循环轮询每个分区,确保消息在所有分区之间均匀分布。
特点:适用于生产者不需要根据消息内容或键选择特定分区的场景,能够实现负载均衡,最大限度地利用集群资源。
默认情况:这是Kafka Java生产者API默认提供的分区策略。如果没有指定分区策略,则会默认使用轮询策略。 - 按键分配策略(Key-Based Partitioning)
原理:消息的键被用作决定消息分区的依据。生产者将消息的键发送给Kafka,Kafka根据键的哈希值将消息路由到相应的分区。
特点:适用于键值对的数据结构,通过将具有相同键的消息发送到同一分区,可以提高数据局部性和处理效率。同时,能够保证具有相同键的消息顺序性。 - 范围分区策略(Range Partitioning)
原理:根据消息键的范围将消息分配到不同的分区。每个分区包含一个键值范围内的消息。
特点:适用于有序数据的处理,如时间戳或递增的ID。通过将具有相似时间戳或递增ID的消息分配到同一分区,可以提高处理效率并保证数据的顺序性。 - 自定义分区策略(Custom Partitioning)
原理:用户可以根据特定的业务逻辑或规则来决定消息的分区。通过实现自定义的分区器类,根据应用程序的需求来定义分区的逻辑。
特点:提供了更高的灵活性,可以根据地理位置、用户ID或其他业务规则来决定消息的分区。
实现方式:实现org.apache.kafka.clients.producer.Partitioner接口,并重写partition、close和configure方法。其中,partition方法是核心,用于根据给定的键、值和分区信息来计算分区号。 - 粘性分区策略(Sticky Partitioning)
原理:尽可能将消息分配到与之前消息相同的分区,以减少跨分区的数据移动和复制。通过维护一个分区和消费者的映射关系来实现。
特点:在消费者组或分区数量发生变化时,能够尽可能减少对现有分区分配的影响,减少负载均衡的开销,提高处理效率。
代码示例
写入默认分区(0号分区)
消息只会发送到指定的分区内部
package com.wunaiieq.partition;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerToPartition {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//TODO 1.声明并实例化Kafka Producer的配置文件对象
Properties prop = new Properties();
//TODO 2.为配置文件对象设置参数
// 2.1 配置bootstrap_servers
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
// 2.2 配置key和value的序列化类
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//TODO 3.声明并实例化生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
//TODO 4.发送消息
for(int i = 0;i<5;i++){
//指定数据发送到0号分区,key为null
producer.send(new ProducerRecord<>("topicA",0,null, "unsync_msg" + i),
new Callback() {
//如下方法在生产者收到acks确认时异步调用
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
//无异常信息,输出主题和分区信息到控制台
System.out.println("topic:"+recordMetadata.topic()
+",partition:"+recordMetadata.partition());
}else{//打印异常信息
System.out.println(e.getMessage());
}
}
});
Thread.sleep(5);
}
//TODO 5.关闭生产者
producer.close();
}
}
自定义分区机制
部分消息可能需要额外的处理内容,比如审计等等,这类消息的key会携带关键字符串“wunaiieq”,现在让其发送到topicA主题的最后一个分区上,以便于后续处理,其他的消息则随机发送(不包括最后一个分区)
WunaiieqPartitioner.java
分区写入策略
package com.wunaiieq.partition;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class WunaiieqPartitioner implements Partitioner {
private Random random;
@Override
public void configure(Map<String, ?> configs) {
//该方法实现必要资源的初始化工作
random = new Random();
}
/** 计算信息对应的分区
* @param topic 主题
* @param key 消息的key
* @param keyBytes 消息的key序列化后的字节数组
* @param value 消息的value
* @param valueBytes 消息value序列化后的字节数组
* @param cluster 集群元数据 可以获取分区信息
* @return 息对应的分区号
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//将key转换为字符串
String keyInfo = (String)key;
//获取主题的分区对象列表
List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
//获取主题下的分区总数量
int partCount = partitionInfoList.size();
if (partCount <= 1) {
System.out.println("1 partition");
return 0; // 只有一个分区时,直接返回0
}
//最后一个分区号
int wunaiieqPartition = partCount-1;
//如果 key 为空、key 为空字符串或 key 不包含 "wunaiieq",则随机选择一个除最后一个分区外的分区;否则,消息发送到最后一个分区。
return keyInfo==null || keyInfo.isEmpty()
||!keyInfo.contains("wunaiieq")
? random.nextInt(partCount-1) : wunaiieqPartition ;
}
@Override
public void close() {
//该方法实现必要资源的清理工作
random = null;
}
}
CustomPartitionerProducer.java
调用分区策略
package com.wunaiieq.partition;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomPartitionerProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// TODO 在用于构造KafkaProducer的Properties对象中设置partitioner.class参数
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
"com.wunaiieq.partition.WunaiieqPartitioner");
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
for(int i = 0;i<5;i++){
// TODO 不指定分区号,key为"wunaiieq"测试运行一次,改为"kafka"后再测试一次。
producer.send(new ProducerRecord<>("topicA","aa", "unsync_msg" + i),
new Callback() {
//如下方法在生产者收到acks确认时异步调用
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
//无异常信息,输出主题和分区信息到控制台
System.out.println("topic:"+recordMetadata.topic()
+",partition:"+recordMetadata.partition());
}else{//打印异常信息
System.out.println(e.getMessage());
}
}
});
Thread.sleep(5);
}
producer.close();
}
}
消息丢失
判断消息丢失时,一般看应答机制
- acks=0:因为不需要等待leader数据持久化就完成应答,leader宕机后可能存在数据丢失(follower内部数据从leader中同步,leader没有---->>follower也没有)
- acks=1:此时leader持久化完成应答(但是follower可能没有完成数据同步,leader宕机,导致数据丢失)一般用于传输普通日志
- acks=all(或acks=-1):leader和follower都持久化后并回应,才认为消息发送成功。这种方式性能最低,但可靠性最高。传输重要数据。
特殊情况
在acks=-1或all的情况下,Leader接收到数据并持久化后,所有Follower开始同步Leader刚刚持久化的数据,但是有一个Follower因故障迟迟不能进行数据同步,该问题应该怎么解决?
Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。
如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。
该时间阈值由replica.lag.time.max.ms参数设定,默认30000ms。例如1超时,(leader:0, isr:0,2)。这样就不用等长期联系不上或者已经故障的节点。
消息绝对不丢失的条件
- ACK级别设置为-1
- 分区副本>=2
- ISR应答的最小副本数>=2 (最小副本数有min.insync.replicas设置,默认为1)
生产中配置响应级别代码块:
// 设置 acks
prop.put(ProducerConfig.ACKS_CONFIG, "all");
//重试次数 retries,默认是 int 最大值,2147483647
prop.put(ProducerConfig.RETRIES_CONFIG, 3);
数据去重
描述
数据重复的原因
- 当生产者发送消息到Kafka集群时,如果由于网络故障或Kafka Broker的临时问题导致消息发送失败,生产者通常会进行重试。如果重试时Kafka Broker已经成功处理了之前的消息但尚未发送确认(ACK),那么重试发送的消息就会导致数据重复。
- 网络延迟或不稳定可能导致消息发送失败,生产者会进行重试,从而增加消息重复的风险。
- 如果Kafka Broker在消息发送成功后崩溃,但在发送确认(ACK)之前崩溃,生产者可能会重试发送相同的消息,导致消息重复。
数据去重
-
至少一次(At Least Once):ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2(保证数据绝对不丢失,但不能保证数据不重复)
-
最多一次(At Most Once):ACK级别设置为0(保证数据绝对不重复,但不能保证数据不丢失)
-
精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
- 精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
- 重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。所以幂等性只能保证的是在单分区单会话内不重复。
使用幂等性的使用
开启参数 enable.idempotence 默认为 true,false 关闭。
事务
- 寻找事务协调器:KafkaProducer 使用 trans.id 寻找事务协调器 (Transaction Coordinator)。
- 协调器通过 broker0 返回事务协调器的地址,包括事务信息的主题和分区领导者 (transaction_state-分区-Leader)。
- 请求 partId(开启幂等性): KafkaProducer向事务协调器请求 partId,并开启幂等性。
- 事务协调器接收到请求后,将请求持久化,并返回 partId给 KafkaProducer。
- 发送消息: KafkaProducer 使用返回的 partId 发送消息到指定主题的指定分区(topicA-Partition0 或 topicA-Partition1)。
6.发送 commit 请求: KafkaProducer发送 commit 请求到事务协调器,以提交事务。
7.事务协调器接收到 commit 请求后,将其持久化。- 事务成功:事务协调器确认事务成功,并返回成功信息给 KafkaProducer。
代码示例(事务)
package com.wunaiieq;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTransaction {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//TODO 设置事务id
prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"transaction_id_topicA_0");
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
//TODO 初始化事务
producer.initTransactions();
//TODO 开启事务
producer.beginTransaction();
//TODO 添加异常处理,成功提交事务,失败回滚事务
try {
//发送消息
for (int i = 0; i < 5; i++) {
//同步发送消息
producer.send(new ProducerRecord<>("topicA", "sync_msg" + i)).get();
}
//TODO 提交事务
producer.commitTransaction();
}catch (Exception e){
//TODO 放弃事务
producer.abortTransaction();
}
producer.close();
}
}