监听RabbitMQ,向Elasticsearch 创建索引
上一篇通过配置Canal+MQ的数据同步环境,实现了Canal从数据库读取binlog并且将数据写入MQ。
下边编写程序监听MQ,收到消息后向ES创建索引。
1. 环境准备
- RabbitMQ:已配置并运行,Canal 已将 MySQL 的
binlog
消息发送到 RabbitMQ 队列中。 - Elasticsearch:已运行,并可以通过 REST API 访问。
- Spring Boot 项目:通过
spring-boot-starter-amqp
和spring-boot-starter-data-elasticsearch
集成 RabbitMQ 和 Elasticsearch。
2. 添加依赖
在 Spring Boot 项目的 pom.xml
中,添加 RabbitMQ 和 Elasticsearch 的依赖:
<dependencies>
<!-- RabbitMQ Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Elasticsearch Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
</dependencies>
3. 配置 application.yml
配置文件中需要包含 RabbitMQ 和 Elasticsearch 的相关信息:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual # 手动ACK
elasticsearch:
rest:
uris: http://localhost:9200
connection-timeout: 1000
socket-timeout: 30000
4. 创建 Elasticsearch 索引
定义一个实体类来映射 Elasticsearch 中的索引:
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
@Document(indexName = "user_index")
public class User {
@Id
private Long id;
private String name;
private String email;
// getters and setters
}
5. 编写 Elasticsearch 的 Repository
创建一个 Elasticsearch 的 Repository
,用于保存数据到索引中:
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
public interface UserRepository extends ElasticsearchRepository<User, Long> {
}
6. 监听 RabbitMQ 队列并同步到 Elasticsearch
创建一个 RabbitMQ 消息监听器,收到消息后解析并将数据存储到 Elasticsearch:
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class CanalToEsSyncService {
@Autowired
private UserRepository userRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
private ObjectMapper objectMapper = new ObjectMapper();
// 监听 Canal 发送到 RabbitMQ 的队列消息
@RabbitListener(queues = "example_queue")
public void handleCanalMessage(String message) {
try {
// 假设 Canal 发送的是 JSON 格式的消息,可以用 Jackson 解析
User user = objectMapper.readValue(message, User.class);
// 将解析的 User 对象存入 Elasticsearch
userRepository.save(user);
// 手动确认消息已成功消费
System.out.println("Message processed and indexed to Elasticsearch: " + user);
} catch (Exception e) {
e.printStackTrace();
// 处理失败,可以记录日志或根据需要重试
}
}
}