当前位置: 首页 > article >正文

Mongodb 开启oplog,java监听oplog并写入关系型数据库

开启Oplog

windows mongodb bin目录下找到配置文件/bin/mongod.cfg,配置如下:

replication:
  replSetName: local
  oplogSizeMB: 1024

在这里插入图片描述
双击mongo.exe
在这里插入图片描述
在这里插入图片描述
执行

rs.initiate({_id: "local", members: [{_id: 0, host: "localhost:27017"}]})

若出现如下情况则成功

{
	"ok" : 1,
	"operationTime" : Timestamp(1627503341, 1),
	"$clusterTime" : {
		"clusterTime" : Timestamp(1627503341, 1),
		"signature" : {
			"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
			"keyId" : NumberLong(0)
		}
	}
}

监听Oplog日志

pom

 	<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.10</version>
        <relativePath/>
    </parent>
    
		<dependency>
        	<groupId>org.springframework.boot</groupId>
       	 	<artifactId>spring-boot-starter-data-mongodb</artifactId>
   	 	</dependency>
 		 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver</artifactId>
            <version>3.12.7</version>
        </dependency>
        <dependency>
            <groupId>com.vividsolutions</groupId>
            <artifactId>jts</artifactId>
            <version>1.13</version>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-spatial</artifactId>
            <version>5.3.0.Beta1</version>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-java8</artifactId>
            <version>5.3.0.Beta1</version>
        </dependency>
        <dependency>
            <groupId>com.bedatadriven</groupId>
            <artifactId>jackson-datatype-jts</artifactId>
            <version>2.3</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>

配置

spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/databaseName?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&currentSchema=public
spring.datasource.username=postgres
spring.datasource.password=123456
spring.jpa.database=postgresql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.spatial.dialect.postgis.PostgisDialect
server.port=10050
spring.data.mongodb.uri=mongodb://admin:123456@localhost:27017/?authSource=admin
spring.data.mongodb.database=databseName

代码

  import com.mongodb.CursorType;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.util.JSON;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.Query;

@Slf4j
@Component
public class OplogListener implements ApplicationListener<ContextRefreshedEvent> {
    @Resource
    private MongoTemplate mongoTemplate;
    @Resource
    private EntityManager entityManager;


    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        MongoDatabase db = mongoTemplate.getMongoDatabaseFactory().getMongoDatabase("local");
        MongoCollection<Document> oplog = db.getCollection("oplog.rs");

        BsonTimestamp startTS = getStartTimestamp();
        BsonTimestamp endTS = getEndTimestamp();

        Bson filter = Filters.and(Filters.gt("ts", startTS));

        MongoCursor<Document> cursor = oplog.find(filter).cursorType(CursorType.TailableAwait).iterator();

        while (true) {
            if (cursor.hasNext()) {
                Document doc = cursor.next();
                String operation = doc.getString("op");

                if (!"n".equals(operation)) {
                    String namespace = doc.getString("ns");
                    String[] nsParts = StringUtils.split(namespace, ".");
                    String collectionName = nsParts[1];
                    String databaseName = nsParts[0];
                    Document object = (Document) doc.get("o");
                    log.info("同步数据:databse-{}  collention-{}  data-{}", databaseName, collectionName, object);
                    if ("i".equals(operation)) {
                        insert((Document) doc.get("o"), databaseName, collectionName);
                    } else if ("u".equals(operation)) {
                        update((Document) doc.get("o"), (Document) doc.get("o2"), databaseName, collectionName);
                    } else if ("d".equals(operation)) {
                        delete((Document) doc.get("o"), databaseName, collectionName);
                    }
                }
            }
        }
    }

    private BsonTimestamp getStartTimestamp() {
        long currentSeconds = System.currentTimeMillis() / 1000;
        return new BsonTimestamp((int) currentSeconds, 1);
    }

    private BsonTimestamp getEndTimestamp() {
        return new BsonTimestamp(0, 1);
    }

    private void insert(Document object, String databaseName, String collectionName) {
        entityManager.getTransaction().begin();
        try {
            String json = JSON.serialize(object);
            Query query = entityManager.createNativeQuery("INSERT INTO " + collectionName + " (json) VALUES (:json)");
            query.setParameter("json", json);
            query.executeUpdate();
            entityManager.getTransaction().commit();
        } catch (Exception e) {
            entityManager.getTransaction().rollback();
            throw new RuntimeException(e);
        }
    }

    private void update(Document object, Document update, String databaseName, String collectionName) {
        entityManager.getTransaction().begin();
        try {
            String json = JSON.serialize(object);
            String updateJson = JSON.serialize(update);
            Query query = entityManager.createNativeQuery("UPDATE " + collectionName + " SET json = :json WHERE json = :updateJson");
            query.setParameter("json", json);
            query.setParameter("updateJson", updateJson);
            query.executeUpdate();
            entityManager.getTransaction().commit();
        } catch (Exception e) {
            entityManager.getTransaction().rollback();
            throw new RuntimeException(e);
        }
    }

    private void delete(Document object, String databaseName, String collectionName) {
        entityManager.getTransaction().begin();
        try {
            String json = JSON.serialize(object);
            Query query = entityManager.createNativeQuery("DELETE FROM " + collectionName + " WHERE json = :json");
            query.setParameter("json", json);
            query.executeUpdate();
            entityManager.getTransaction().commit();
        } catch (Exception e) {
            entityManager.getTransaction().rollback();
            throw new RuntimeException(e);
        }
    }
}

http://www.kler.cn/news/157317.html

相关文章:

  • 一天一个设计模式---生成器模式
  • 双目测宽仪高质量生产利器
  • 网页中的json文档,怎么保存到本地
  • 强化学习Markov重要公式推导过程
  • 你好!斐波那契查找【JAVA】
  • 黑猫带你学eMMC协议第31篇:什么是eMMC的驱动强度(Drive Strength)
  • 详解FreeRTOS:软件定时器(高级篇—4)
  • Rust 语言:认识 Rust
  • 考研英语语法(三十九)
  • 合并两个有序链表[简单]
  • UDS 诊断报文格式
  • Vue入门——v-on标签
  • 回溯和分支算法
  • Snagit 2024.0.1(Mac屏幕截图软件)
  • 【五分钟】熟练使用numpy.cumsum()函数(干货!!!)
  • 接口压测指南
  • Spring IOC—基于XML配置和管理Bean 万字详解(通俗易懂)
  • iOS ------ UICollectionView
  • Python —— Mock接口测试
  • ElasticSearch知识体系详解
  • 解码 SQL:深入探索 Antlr4 语法解析器背后的奥秘
  • Web前端 ---- 【vue】vue 组件传值(props、全局事件总线、消息的订阅与发布)
  • 10个顶级Linux开源反向代理服务器 - 解析与导航
  • 字节内部自动化测试教程:python+pytest接口自动化-接口测试一般流程及方法
  • CoreDNS实战(一)-构建高性能、插件化的DNS服务器
  • Azure Machine Learning - 使用 REST API 创建 Azure AI 搜索索引
  • 【云备份】项目介绍
  • SoC with CPLD and MCU ?
  • ODN光纤链路全程衰减如何计算
  • d3dx9_43.dll丢失原因以及5个解决方法详解