springboot整合kafka
Springboot整合Kafka
1.下载kafka
启动
1.zookeeper-server-start.bat ..\..\config\zookeeper.properties
2.kafka-server-start.bat ..\..\config\server.properties
3.kafka-server-stop.bat
4.zookeeper-server-stop.bat
2.引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3.消息实体
package com.example.kafka;
import java.util.Date;
public class KafkaMessage
{
private long id;
private String username;
private String password;
private Date date;
public long getId()
{
return id;
}
public void setId(long id)
{
this.id = id;
}
public String getUsername()
{
return username;
}
public void setUsername(String username)
{
this.username = username;
}
public String getPassword()
{
return password;
}
public void setPassword(String password)
{
this.password = password;
}
public Date getDate()
{
return date;
}
public void setDate(Date date)
{
this.date = date;
}
}
4.生产者
package com.example.kafka;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer
{
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public KafkaTemplate<String, String> getKafkaTemplate()
{
return kafkaTemplate;
}
public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate)
{
this.kafkaTemplate = kafkaTemplate;
}
public void sendKafkaMessage(KafkaMessage message)
{
kafkaTemplate.send("myTopic", JSONObject.toJSONString(message));
}
}
5.消费者
package com.example.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer
{
@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void obtainMessage(ConsumerRecord<String, String> consumerRecord)
{
System.out.println("obtainMessage invoked");
String topic = consumerRecord.topic();
String key = consumerRecord.key();
String value = consumerRecord.value();
long timestamp = consumerRecord.timestamp();
int partition = consumerRecord.partition();
System.out.println("topic:" + topic);
System.out.println("key:" + key);
System.out.println("value:" + value);
System.out.println("timestamp:" + timestamp);
System.out.println("partition:" + partition);
System.out.println("=======================");
}
}
5.controller测试
package com.example.controller;
import com.example.kafka.KafkaMessage;
import com.example.kafka.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.text.SimpleDateFormat;
import java.util.Date;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController
{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private KafkaProducer kafkaProducer;
@RequestMapping(value = "message", method = RequestMethod.GET)
public KafkaMessage sendKafkaMessage(@RequestParam(name = "id") long id, @RequestParam(name = "username") String username,
@RequestParam(name = "password") String password)
{
KafkaMessage kafkaMessage = new KafkaMessage();
kafkaMessage.setId(id);
kafkaMessage.setUsername(username);
kafkaMessage.setPassword(password);
kafkaMessage.setDate(new Date());
kafkaProducer.sendKafkaMessage(kafkaMessage);
return kafkaMessage;
}
}