kafka快速上手
一、kafka介绍
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。
kafka官网:http://kafka.apach e.org/
二、kafka入门
- 生产者发送消息,多个消费者只能有一个消费者接收到消息
- 生产者发送消息,多个消费者都可以接收到消息
步骤如下:
(1)创建kafka-demo项目,导入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
(2)生产者发送消息
public class ProducerQuickStart {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.kafka链接配置信息
Properties prop = new Properties();
//kafka链接地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
//key和value的序列化
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//2.创建kafka生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);
//3.发送消息
/**
* 第一个参数 :topic
* 第二个参数:消息的key
* 第三个参数:消息的value
*/
ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");
//同步发送消息
producer.send(kvProducerRecord);
//4.关闭消息通道 必须要关闭,否则消息发送不成功
producer.close();
}
}
(3)消费者接收消息
public class ConsumerQuickStart {
public static void main(String[] args) {
//1.kafka的配置信息
Properties prop = new Properties();
//链接地址
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//key和value的反序列化器
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//设置消费者组
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
//2.创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
//3.订阅主题
consumer.subscribe(Collections.singletonList("topic-first"));
//4.拉取消息
while (true) {
// 读取数据,读取超时时间为100ms ,即每个1000ms拉取一次
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
}
}
}
}
使用情景:
生产者发送消息,多个消费者订阅同一个主题(多个消费者都是一个组)只能有一个消费者收到消息 (一对一)
生产者发送消息,多个消费者订阅同一个主题(多个消费者不是一个组)所有消费者都能收到消息 (一对多)
三、springboot集成kafka
1.导入spring-kafka依赖信息
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- kafkfa --> <!-- Spring Kafka的依赖,排除了kafka-clients --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <!-- 显式声明kafka-clients的依赖 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> </dependencies>
2.在resources下创建文件application.yml
server: port: 9991 spring: application: name: kafka-demo kafka: bootstrap-servers: localhost:9092 producer: retries: 10 #重试的次数 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.消息生产者
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello")
public String hello(){
kafkaTemplate.send("test-topic","HelloWorld");
// User user=new User();
// user.setUsername("lili");
// user.setAge(18);
// kafkaTemplate.send("test-topic", JSON.toJSONString(user));
return "ok";
}
}
4.消息消费者
@Component
public class HelloListener {
@KafkaListener(topics = "test-topic")
public void onMessage(String message){
if (!StringUtils.isEmpty(message)){
System.out.println(message);
// User user = JSON.parseObject(message, User.class);
// System.out.println(user);
}
}
}
传递消息为对象:
目前springboot整合后的kafka,因为序列化器是StringSerializer,可以把要传递的对象进行转json字符串,接收消息后再转为对象即可