当前位置: 首页 > article >正文

监听RabbitMQ,向Elasticsearch 创建索引

上一篇通过配置Canal+MQ的数据同步环境,实现了Canal从数据库读取binlog并且将数据写入MQ。

下边编写程序监听MQ,收到消息后向ES创建索引。

1. 环境准备

  • RabbitMQ:已配置并运行,Canal 已将 MySQL 的 binlog 消息发送到 RabbitMQ 队列中。
  • Elasticsearch:已运行,并可以通过 REST API 访问。
  • Spring Boot 项目:通过 spring-boot-starter-amqpspring-boot-starter-data-elasticsearch 集成 RabbitMQ 和 Elasticsearch。

2. 添加依赖

在 Spring Boot 项目的 pom.xml 中,添加 RabbitMQ 和 Elasticsearch 的依赖:

<dependencies>
    <!-- RabbitMQ Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    <!-- Elasticsearch Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    </dependency>
</dependencies>

3. 配置 application.yml

配置文件中需要包含 RabbitMQ 和 Elasticsearch 的相关信息:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual   # 手动ACK

  elasticsearch:
    rest:
      uris: http://localhost:9200
    connection-timeout: 1000
    socket-timeout: 30000

4. 创建 Elasticsearch 索引

定义一个实体类来映射 Elasticsearch 中的索引:

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;

@Document(indexName = "user_index")
public class User {

    @Id
    private Long id;
    private String name;
    private String email;

    // getters and setters
}

5. 编写 Elasticsearch 的 Repository

创建一个 Elasticsearch 的 Repository,用于保存数据到索引中:

import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

public interface UserRepository extends ElasticsearchRepository<User, Long> {
}

6. 监听 RabbitMQ 队列并同步到 Elasticsearch

创建一个 RabbitMQ 消息监听器,收到消息后解析并将数据存储到 Elasticsearch:

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class CanalToEsSyncService {

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private ObjectMapper objectMapper = new ObjectMapper();

    // 监听 Canal 发送到 RabbitMQ 的队列消息
    @RabbitListener(queues = "example_queue")
    public void handleCanalMessage(String message) {
        try {
            // 假设 Canal 发送的是 JSON 格式的消息,可以用 Jackson 解析
            User user = objectMapper.readValue(message, User.class);

            // 将解析的 User 对象存入 Elasticsearch
            userRepository.save(user);

            // 手动确认消息已成功消费
            System.out.println("Message processed and indexed to Elasticsearch: " + user);
        } catch (Exception e) {
            e.printStackTrace();
            // 处理失败,可以记录日志或根据需要重试
        }
    }
}

http://www.kler.cn/a/308105.html

相关文章:

  • Git在版本控制中的应用
  • 一键生成本地SSL证书:打造HTTPS安全环境
  • 图论-代码随想录刷题记录[JAVA]
  • EXCEL延迟退休公式
  • zabbix搭建钉钉告警流程
  • Java 多线程(三)—— 死锁
  • python selenium网页操作
  • C++笔记---二叉搜索树
  • 动手学深度学习(pytorch)学习记录31-批量规范化(batch normalization)[学习记录]
  • C++基础面试题 | C++中的构造函数可以是虚函数吗? C++中的析构函数一定要是虚函数吗?
  • SpringBoot 消息队列RabbitMQ消息的可靠性 配置连接重试 生产者重连
  • 医学数据分析实训 项目三 关联规则分析作业--在线购物车分析--痹症方剂用药规律分析
  • 科技赋能司法:易保全如何重塑法律文书签署与庭审流程
  • yjs07——numpy数组的使用
  • 【Linux】-基本指令(上)
  • 7-16 一元多项式求导(vector)
  • Linux - iptables防火墙
  • 安全、稳定、高速的跨国文件传输系统
  • Vue3 : ref 与 reactive
  • 【DataSophon】Yarn配置历史服务器JobHistory和Spark集成historyServer
  • 【C++】list常见用法
  • 数据库基础(MySQL)
  • 【C++】——string类的模拟实现
  • 【网络】DNS,域名解析系统
  • Vue Application exit (SharedArrayBuffer is not defined)
  • 数据结构与算法-17高级数据结构_图论(迪杰斯特拉算法)