Mongodb 开启oplog,java监听oplog并写入关系型数据库
开启Oplog
windows mongodb bin目录下找到配置文件/bin/mongod.cfg,配置如下:
replication:
replSetName: local
oplogSizeMB: 1024
双击mongo.exe
执行
rs.initiate({_id: "local", members: [{_id: 0, host: "localhost:27017"}]})
若出现如下情况则成功
{
"ok" : 1,
"operationTime" : Timestamp(1627503341, 1),
"$clusterTime" : {
"clusterTime" : Timestamp(1627503341, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
监听Oplog日志
pom
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.10</version>
<relativePath/>
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver</artifactId>
<version>3.12.7</version>
</dependency>
<dependency>
<groupId>com.vividsolutions</groupId>
<artifactId>jts</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-spatial</artifactId>
<version>5.3.0.Beta1</version>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-java8</artifactId>
<version>5.3.0.Beta1</version>
</dependency>
<dependency>
<groupId>com.bedatadriven</groupId>
<artifactId>jackson-datatype-jts</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
配置
spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/databaseName?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false¤tSchema=public
spring.datasource.username=postgres
spring.datasource.password=123456
spring.jpa.database=postgresql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.spatial.dialect.postgis.PostgisDialect
server.port=10050
spring.data.mongodb.uri=mongodb://admin:123456@localhost:27017/?authSource=admin
spring.data.mongodb.database=databseName
代码
import com.mongodb.CursorType;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.util.JSON;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.Query;
@Slf4j
@Component
public class OplogListener implements ApplicationListener<ContextRefreshedEvent> {
@Resource
private MongoTemplate mongoTemplate;
@Resource
private EntityManager entityManager;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
MongoDatabase db = mongoTemplate.getMongoDatabaseFactory().getMongoDatabase("local");
MongoCollection<Document> oplog = db.getCollection("oplog.rs");
BsonTimestamp startTS = getStartTimestamp();
BsonTimestamp endTS = getEndTimestamp();
Bson filter = Filters.and(Filters.gt("ts", startTS));
MongoCursor<Document> cursor = oplog.find(filter).cursorType(CursorType.TailableAwait).iterator();
while (true) {
if (cursor.hasNext()) {
Document doc = cursor.next();
String operation = doc.getString("op");
if (!"n".equals(operation)) {
String namespace = doc.getString("ns");
String[] nsParts = StringUtils.split(namespace, ".");
String collectionName = nsParts[1];
String databaseName = nsParts[0];
Document object = (Document) doc.get("o");
log.info("同步数据:databse-{} collention-{} data-{}", databaseName, collectionName, object);
if ("i".equals(operation)) {
insert((Document) doc.get("o"), databaseName, collectionName);
} else if ("u".equals(operation)) {
update((Document) doc.get("o"), (Document) doc.get("o2"), databaseName, collectionName);
} else if ("d".equals(operation)) {
delete((Document) doc.get("o"), databaseName, collectionName);
}
}
}
}
}
private BsonTimestamp getStartTimestamp() {
long currentSeconds = System.currentTimeMillis() / 1000;
return new BsonTimestamp((int) currentSeconds, 1);
}
private BsonTimestamp getEndTimestamp() {
return new BsonTimestamp(0, 1);
}
private void insert(Document object, String databaseName, String collectionName) {
entityManager.getTransaction().begin();
try {
String json = JSON.serialize(object);
Query query = entityManager.createNativeQuery("INSERT INTO " + collectionName + " (json) VALUES (:json)");
query.setParameter("json", json);
query.executeUpdate();
entityManager.getTransaction().commit();
} catch (Exception e) {
entityManager.getTransaction().rollback();
throw new RuntimeException(e);
}
}
private void update(Document object, Document update, String databaseName, String collectionName) {
entityManager.getTransaction().begin();
try {
String json = JSON.serialize(object);
String updateJson = JSON.serialize(update);
Query query = entityManager.createNativeQuery("UPDATE " + collectionName + " SET json = :json WHERE json = :updateJson");
query.setParameter("json", json);
query.setParameter("updateJson", updateJson);
query.executeUpdate();
entityManager.getTransaction().commit();
} catch (Exception e) {
entityManager.getTransaction().rollback();
throw new RuntimeException(e);
}
}
private void delete(Document object, String databaseName, String collectionName) {
entityManager.getTransaction().begin();
try {
String json = JSON.serialize(object);
Query query = entityManager.createNativeQuery("DELETE FROM " + collectionName + " WHERE json = :json");
query.setParameter("json", json);
query.executeUpdate();
entityManager.getTransaction().commit();
} catch (Exception e) {
entityManager.getTransaction().rollback();
throw new RuntimeException(e);
}
}
}