2024/9/10黑马头条跟学笔记(六)
D6
1.今日学习内容
1.1需求分析
点击下架之后,app端显示以下架
耦合,没技术点,不用,咱用kafka,流量削峰,异步调用,解耦
为什么要学?
- 面时提问
2.kafka概述
rabbitMQ,erlang语言,扩展麻烦
kafka,高吞吐
rocketMQ,稳定 11.11拷打过的
rabbitMQ 性能好,社区活跃
消费者和生产者,中间环境流式传输,和DB数据库,由卡夫卡集群统一管理
2.1名词介绍
topic,kafka消息的key值
一个服务器一个broker,多个broker组成cluster集群
3.kafka环境搭建
安装zookeeper
离不开zookeeper ,其保存了kafka节点数据
镜像下载
docker pull zookeeper:3.4.14
docker run -d --name zookeeper -p 2181:2181 zookeeper
安装kafka
镜像
docker pull wurstmeister/kafka:2.12-2.3.1
容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.233.136 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.233.136:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.233.136:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1
服务器ip
连接zookeeper ip+端口
暴露9092
最大最小内存256M
网络空间为 host也就是宿主机,省写了-p 9092:9092 表示了容器的网络就是宿主机的网络,没分内外网络端口之分
不过云主机就得写端口映射了,容器和云主机网络隔离了
4.kafka入门案例
步骤
依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
生产者发送
package com.heima.kafka.sample;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 生产者
*/
@Slf4j
public class ProducerQuickStart {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.kafka链接配置信息
Properties prop = new Properties();
//kafka链接地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.233.136:9092");
//key和value的序列化
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//2.创建kafka生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);
//3.发送record消息
/**
* 第一个参数 :topic
* 第二个参数:消息的key
* 第三个参数:消息的value
*/
ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");
producer.send(kvProducerRecord);
System.out.println("发送了消息");
//4.关闭消息通道 必须要关闭,否则消息发送不成功
producer.close();
}
}
- 配置map,填服务器地址
- 由于需要网络传输,设置k,v序列化器
- 创建生成者对象,填入配置map
- 发送消息, 参数1:队列名 2:键 3:值
- send发送后需要 close关闭,否则消息发送不成功(有点类似流式传输文件)
消费者接收
package com.heima.kafka.sample;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 消费者
*/
@Slf4j
public class ConsumerQuickStart {
public static void main(String[] args) {
// 1.kafka的配置信息
Properties prop = new Properties();
// 链接地址
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.136:9092");
// key和value的反序列化器
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 设置消费者组
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
// 2.创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
// 3.订阅主题
consumer.subscribe(Collections.singletonList("topic-first"));
System.out.println("等待消息");
// 4.拉取消息
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
}
}
}
}
- 配置消息,序列化器
- 消费者组(与生产者不同之处)
- 消费者对象,订阅主题,频率每1秒拉取1次,while循环
- 拉取的了多个map值循环遍历出key和value
由于订阅主题参数为集合,我们将字符串转为集合
consumer.subscribe(Collections.singletonList(“topic-first”));
启动测试
先运行消费者
多个订阅者订阅一个主题。一个消费者消费(1对1)
复制一份服务。发送消息测试。可知二者在同一个组下,只有一个消费者会得到消息
1对多
一个主题多个消费者接收。每个消费者单独设置一个group
将改为group2然后启动
二者都能接收到消息
类似于报纸,什么专栏topic,下面的什么标题key,对应的什么内容value,group相当于订阅人,老王家订阅了一份group1,就发报纸给老王家,如果老王家的孩子多订阅了一份group2,那么老王和老王孩子都能收到消息
5.kafka分区机制
类似于文件夹,且分区文件夹可以在不同的机器上,防止topic存储量过大爆满borker(经纪人)
topic剖析
将topic分到三个区域,持续添加,每个消息都有一个序列号,称为偏移量,且唯一
分区策略
轮询,消息顺序发放各个分区,
随机,顾名思义,
按键,数据指定key,计算该key的hashCode,根据该hashcode对不同消息存储
不指定key和分区就是轮询,指定分区和key,就按指定分区和key塞数据
输出分区尝尝咸淡
指定key指定分区尝尝咸淡
不指定key和分区,按轮询发送尝尝咸淡
6.kafka高可用设计
备份机制
有什么用?
生产者发送消息给集群,通常由leader备份转发数据,leader下有很多小兵,同时进行备份数据
同步方式
分为同步ISR 小兵队列 (in sync replica)和普通的异步备份 小兵队列
leader备份大哥宕机后,从ISR小兵队列中挑选新的老大,当ISR同步小兵队列里的也全宕机,使用普通异步备份队列中的小兵作为leader
两种选择方案
如果想要保证数据一致性,等待ISR的follower 重连
如果想要保证数据的可用性(死马当活马医) 就直接用普通队列里的任意一个
7.kafka生产者详解
7.1发送类型
7.1.2同步
send方法发送,返回future对象。调用get方法同步等待recordMetadata结果, 输出offset可以获取id偏移量
7.1.3异步
7.2发送测试
同步
// 同步消息发送
RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());
System.out.println("发送了消息");
传递参数和回调函数,当发送结束后,判断发送状态,失败后抛出异常,可拿该异常记录日志(数据库insert)
该偏移量为连续自增的一个数据
异步
不过如果每次发送消息都会等待,100000条数据则造成阻塞,在这可以使用异步看看
// 异步消息发送
producer.send(kvProducerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("记录异常消息到日志表中");
}
System.out.println("发送了消息,偏移量为:"+recordMetadata.offset());
}
});
推荐异步
7.3参数配置
ack
全称为 acknowledgement 在计网和通信中,指的是一种确认信号,用于确认对方是否接收到数据
acks=0表示 发消息,管你收没收到给没给我响应,直接执行下一条代码 类似于异步, 速度快,风险高
acks=1 默认值 集群首领收到信息,就会收到来自kafka服务器的成功响应
acks=all 集群首领,所有follower收到信息,才能收到服务器成功响应
设置配置
// 设置acks配置
prop.put(ProducerConfig.ACKS_CONFIG, "all");
retries
收到的错误可能是临时性的,retries设置重发次数,达到最大值不重试了。类似tg, 默认每次重试间隔100ms
设置
//设置超时重传次数
prop.put(ProducerConfig.RETRIES_CONFIG, "10");
信息压缩
算法 | cpu占用 | 压缩比 | 性能速度 |
---|---|---|---|
snappy | 少 | 好 | 好 |
lz4 | 少 | 好 | 好 |
gzip | 多 | 更好 | 慢 |
总结,用snappy
// 设置消息压缩算法
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
8.消费者详解
8.1消费者组
8.2有序性
先买了个包子,再买了个苹果,其消费短信发送必须有顺序
topic分区消息只能保证,该分区下顺序处理,超出该分区分配到其他消费者无法确认消息的先后顺序
例如p0分区对应c1,c3,只能在该p0 从c1到c3,
p3分区只能c1,c4,
如果想要顺序处理topic消息,只能提供一个分区 ,
如右图,偏移量按顺序递增,消息按顺序发送
8.3提交和偏移量
不像rabbitMQ有确认机制,刚才那个是生产者与服务器的响应,现在是消费者的响应,没有确认机制
那该如何确认消息被消费了?
根据偏移量来确认消费到了哪个位置,由位置确定消息有无被消费
一旦消费,自动往_consumer_offset特殊主题发送消息记录日志,包含每个分区的偏移量
当消费者2崩溃了,其负责的分区均衡的分配给其他消费者
一个消费者组内只有一个消费者消费消息
生产者按分区发布消息时,和该分区绑定的消费者消费信息
如果和该分区绑定的消费者宕机了,则空闲的分区由其余消费者消费
b友的解释
拉取到数据就提交offset,业务未执行到该环节就宕机了,也好比美团接了10单,到送完了5单,但是后面有事不送了,剩下五单就没得吃了饿死了
个人看法
每个五秒更新偏移量,第四秒宕机后,白白消费,白白执行了一堆代码,后续的消费者接收该烂摊子又得从头开始执行业务代码
8.4手动提交偏移量三种方式
8.4.1同步提交
8.4.2异步提交
8.4.3同步异步组合提交
异步没有重试机制,同步直至重试结束才下一段代码
多个异步的重试可能导致位移覆盖,无阻塞,位移抢占问题
二者结合一下,同步可以重试,异步不会阻塞,那就让异步的异常转为同步提交,这样一来,当异步任务异常后,也能进行重传
8.5代码环节
同步
光说不练假把式,上代码
设置手动提交配置
// 设置手动提交
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
同步提交,在消费后添加该代码
try {
consumer.commitSync();
} catch (Exception e) {
System.out.println("提交失败,异常为:"+e);
}
发送测试
b友发言
这个是为了解决分区的问题,一个消费者可能有多个分区,在有消费者异常后kafka进行均衡给消费者新增一个分区,要处理的额就是这个新加分区的偏移量问题
也就是说手动提交解决分区偏移量偏差问题(他为什么非要五秒更新偏移量啊,时时刻刻不行吗,这样你执行完业务逻辑并且更新偏移量不就不会出现问题了)
手动更新的好处就是可以控制更新偏移量的时机,这样一来,消费者A接收B的烂摊子时,其偏移量不会出现偏差导致重复执行任务
异步
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null)
System.out.println("提交失败,异常为:" + e);
System.out.println("提交错误的偏移量:" + map);
}
});
重新发送测试偏移量提交情况
同步+异步
原来的注释掉写这个
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
System.out.println(consumerRecord.offset());
System.out.println(consumerRecord.partition());
}
//执行异步操作,防止阻塞,异常交给下面的同步操作
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("记录错误信息:" + e);
} finally {
//执行同步操作,方便后续重传
consumer.commitSync();
}
测试能否正确接收偏移量
正常接收
小疑问
finally 里也有提交,那我会提交两次??? 又异步又同步??
nonono,while 死循环了,出不来,要出来的话那异步就无了,执行同步操作,至始至终只有一种提交偏移量方式被采用
9.springBoot集成kafka收发消息
依赖
在原来的kafka-demo模块下补充
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
配置文件
server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.200.130:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
controller
package com.heima.kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello")
public String hello(){
kafkaTemplate.send("itcast-topic","黑马程序员");
// User user = new User();
// user.setUsername("xiaowang");
// user.setAge(18);
// kafkaTemplate.send("user-topic", JSON.toJSONString(user));
return "ok";
}
}
接收者类
package com.heima.kafka.listener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class HelloListener {
@KafkaListener(topics = "user-topic")
public void onMessage(String message){
System.out.println(message);
// if(!StringUtils.isEmpty(message)){
// User user = JSON.parseObject(message, User.class);
// System.out.println(user);
// }
}
}
- @KafkaListener(topics = “user-topic”)
断点调试
传递对象为对象
方式一,自定义序列化器,对象多通用性不强,不介绍
方式二,发送时转JSON字符串,然后再json字符串转为对象,指定类型 有点像前端传递数据
user类
package com.heima.kafka.pojo;
public class User {
private String username;
private Integer age;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"username='" + username + '\'' +
", age=" + age +
'}';
}
}
把之前的注释给注释回来,重启发请求
10.自媒体文章上下架
当前环节仅仅写了业务修改自媒体的上下架状态,没有涉及消息发送
需求
流程图
思路
- id查文章,查发布状态,如果都通过检测那就修改 enable状态为下架
- 同时发消息kafka携带articleId,
- 此时article端处于监听消息状态(while循环或者springboot集成),然后收到消息后执行修改is Down字段下架操作
这种消息队列就是传递了 ID值,指定了topic表示了什么业务类型的操作,然后由收消息方书写业务逻辑,好比老师叫你去黑板写题, 小明同学(articleId),到黑板写题(answerQuestion)
接口定义
设置一个dto或者在原有的dto追加enable字段,
这里进行追加
dto
/**
* 上下架 0 下架 1 上架
*/
private Short enable;
service
WmNewsService
/**
* 文章的上下架
* @param dto
* @return
*/
public ResponseResult downOrUp(WmNewsDto dto);
impl
先自己按照思路敲一遍
(复制过去,然后翻译代码)
- 检测dto参数
- 查文章,判断不为空,和状态为已发布
- 修改自媒体文章数据库 该enable状态
- 判断articleId是否为空(发布到app端返回的那个)
不为空则发消息,参数为topic和map(id和enable状态) - 一切到位后返回成功结果对象
注意这里是lambdaUpdate而不是lambdaQuery
@Override
public ResponseResult downOrUp(WmNewsDto dto) {
// 1. 检测dto的id参数
if (dto.getId() == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
// 2. 查文章,判断不为空,和状态为已发布
WmNews wmNews = getById(dto.getId());
if (wmNews == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST, "文章不存在");
}
if (!wmNews.getStatus().equals((WmNews.Status.PUBLISHED.getCode()))) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "文章不是已发布状态");
}
// 3. 判断dto中的上下架状态是否符合规范,修改自媒体文章数据库 该enable状态
if (dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2) {
update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable, dto.getEnable())
.eq(WmNews::getId, wmNews.getId()));
}
// 5. 一切到位后返回成功结果对象
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
controller
@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
return wmNewsService.downOrUp(dto);
}
启动项目,前后端联调
11.通知app端下架
导入依赖
heima-common模块下
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
nacos中自媒体配置文件
生产者,重试次数,序列化器,服务器
spring:
kafka:
bootstrap-servers: 192.168.233.136:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
自媒体上下架后发布
// 4. 判断articleId是否为空(发布到app端返回的那个)
// 不为空则发消息,参数为topic和map(id和enable状态)
if (wmNews.getArticleId() != null) {
// 5. 构建map,存放id和状态
Map<String, Object> map = new HashMap<>();
map.put("articleId", wmNews.getArticleId());
map.put("enable", dto.getEnable());
// 6. 发送消息
kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC, JSON.toJSONString(map));
}
常量
public class WmNewsMessageConstants {
public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}
article端的nacos配置yml
spring:
kafka:
bootstrap-servers: 192.168.233.136:9092
consumer:
group-id: ${spring.application.name}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
app服务进行监听
和刚才大差不差,一个注解话题名, 该注解使得该方法一直处于监听状态,当有消息传达过来时判断是否为空,然后转对象(简单的json序列化,json字符串到object对象)
然后调用方法进行配置表修改状态
package com.heima.article.listener;
import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleConfigService;
import com.heima.common.constants.WmNewsMessageConstants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@Slf4j
public class ArticleIsDownListener {
@Autowired
private ApArticleConfigService apArticleConfigService;
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message){
if(StringUtils.isNotBlank(message)){
Map map = JSON.parseObject(message, Map.class);
apArticleConfigService.updateByMap(map);
}
}
}
apArticle Config的service和impl
改表的数据
service
package com.heima.article.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.heima.model.article.pojos.ApArticleConfig;
import java.util.Map;
public interface ApArticleConfigService extends IService<ApArticleConfig> {
/**
* 修改文章
* @param map
*/
void updateByMap(Map map);
}
impl
正常思维都是初始一个isDown,1为false 0为true
我们也可以初始化isDown 为true,是否下架,当为1时修改就行,不用 else
默认值思想
以下代码加了事务注解是为了未来扩展方便
5. 一致的编程风格
- 在服务实现中使用事务可以保持一致的编程风格,确保所有数据库操作都遵循相同的事务管理策略。
package com.heima.article.service.impl;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.heima.article.mapper.ApArticleConfigMapper;
import com.heima.article.service.ApArticleConfigService;
import com.heima.model.article.pojos.ApArticleConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Map;
@Service
@Slf4j
@Transactional
public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService {
/**
* 修改文章
*
* @param map
*/
@Override
public void updateByMap(Map map) {
// 判断enable的数值
// 0下架,1上架
// 六将思维
boolean isDown=true;
Object enable = map.get("enable");
if (enable.equals(1)) {
isDown = false;
}
//修改文章
update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId"))
.set(ApArticleConfig::getIsDown,isDown));
}
}
启动项目进行测试
点击下架
变为0,我们复制该 articleId到 aparticle数据库的config表查询是否isDown为1
成功发送消息给ap端并且下架文章
总结
修改状态,修改自媒体的状态,然后再发消息给ap端修改ap端数据库的上下架状态(article前提得接收)
kafka,
发送端指定话题,然后发送消息
接收端循环监听消息来源
发送端有ask机制,接收端没有,因此得手动提交偏移量
但是默认为自动添加偏移量,当有多个消费者负责不同分区,一个消费者崩溃后其他消费者接收该消费者的烂摊子,由于该消费者处理了任务但是还没来得及提交偏移量,因此其他消费者重复执行消息所代表的业务
因此我们使用了异步+同步的方式提交偏移量,避免重复执行操作
接下来我们用了springboot集成的kafka步骤如下
接收端和发送端各自配置好配置文件(存放在nacos配置中心)
发送端调用 springboot集成的生产者对象发送,设置好topic和内容 一般使用map记录(转为字符串) 消息类型和消息传达到哪个人和该做什么事,从而那个人收到消息后做出相应动作
接收端加注解,指定监听什么话题,形参为字符串,判断消息是否为空,不为空则调用方法做出相应动作
二次总结, 一个发送,一个接收,中间通过配置文件指定kafka中心在哪台服务器上 ,配置相关配置(固定的例如序列化器,重试次数,组id,这里同步异步提交偏移量应该集成好了,不用手动设置了)