本文介绍: 双击mongo.exe执行若出现如下情况则成功。

开启Oplog

windows mongodb bin目录下找到配置文件/bin/mongod.cfg,配置如下

replication:
  replSetName: local
  oplogSizeMB: 1024

在这里插入图片描述
双击mongo.exe
在这里插入图片描述
在这里插入图片描述
执行

rs.initiate({_id: "local", members: [{_id: 0, host: "localhost:27017"}]})

若出现如下情况则成功

{
	"ok" : 1,
	"operationTime" : Timestamp(1627503341, 1),
	"$clusterTime" : {
		"clusterTime" : Timestamp(1627503341, 1),
		"signature" : {
			"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
			"keyId" : NumberLong(0)
		}
	}
}

监听Oplog日志

pom

 	<parent&gt;
        <groupId&gt;org.springframework.boot</groupId&gt;
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.10</version>
        <relativePath/>
    </parent>
    
		<dependency>
        	<groupId>org.springframework.boot</groupId>
       	 	<artifactId>spring-boot-starter-data-mongodb</artifactId>
   	 	</dependency>
 		 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver</artifactId>
            <version>3.12.7</version>
        </dependency>
        <dependency>
            <groupId>com.vividsolutions</groupId>
            <artifactId>jts</artifactId>
            <version>1.13</version>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-spatial</artifactId>
            <version>5.3.0.Beta1</version>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-java8</artifactId>
            <version>5.3.0.Beta1</version>
        </dependency>
        <dependency>
            <groupId>com.bedatadriven</groupId>
            <artifactId>jackson-datatype-jts</artifactId>
            <version>2.3</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>

配置

spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/databaseName?characterEncoding=UTF-8&amp;useUnicode=true&amp;useSSL=false&amp;tinyInt1isBit=false&amp;currentSchema=public
spring.datasource.username=postgres
spring.datasource.password=123456
spring.jpa.database=postgresql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.spatial.dialect.postgis.PostgisDialect
server.port=10050
spring.data.mongodb.uri=mongodb://admin:123456@localhost:27017/?authSource=admin
spring.data.mongodb.database=databseName

代码

  import com.mongodb.CursorType;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.util.JSON;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.Query;

@Slf4j
@Component
public class OplogListener implements ApplicationListener<ContextRefreshedEvent> {
    @Resource
    private MongoTemplate mongoTemplate;
    @Resource
    private EntityManager entityManager;


    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        MongoDatabase db = mongoTemplate.getMongoDatabaseFactory().getMongoDatabase("local");
        MongoCollection<Document> oplog = db.getCollection("oplog.rs");

        BsonTimestamp startTS = getStartTimestamp();
        BsonTimestamp endTS = getEndTimestamp();

        Bson filter = Filters.and(Filters.gt("ts", startTS));

        MongoCursor<Document> cursor = oplog.find(filter).cursorType(CursorType.TailableAwait).iterator();

        while (true) {
            if (cursor.hasNext()) {
                Document doc = cursor.next();
                String operation = doc.getString("op");

                if (!"n".equals(operation)) {
                    String namespace = doc.getString("ns");
                    String[] nsParts = StringUtils.split(namespace, ".");
                    String collectionName = nsParts[1];
                    String databaseName = nsParts[0];
                    Document object = (Document) doc.get("o");
                    log.info("同步数据:databse-{}  collention-{}  data-{}", databaseName, collectionName, object);
                    if ("i".equals(operation)) {
                        insert((Document) doc.get("o"), databaseName, collectionName);
                    } else if ("u".equals(operation)) {
                        update((Document) doc.get("o"), (Document) doc.get("o2"), databaseName, collectionName);
                    } else if ("d".equals(operation)) {
                        delete((Document) doc.get("o"), databaseName, collectionName);
                    }
                }
            }
        }
    }

    private BsonTimestamp getStartTimestamp() {
        long currentSeconds = System.currentTimeMillis() / 1000;
        return new BsonTimestamp((int) currentSeconds, 1);
    }

    private BsonTimestamp getEndTimestamp() {
        return new BsonTimestamp(0, 1);
    }

    private void insert(Document object, String databaseName, String collectionName) {
        entityManager.getTransaction().begin();
        try {
            String json = JSON.serialize(object);
            Query query = entityManager.createNativeQuery("INSERT INTO " + collectionName + " (json) VALUES (:json)");
            query.setParameter("json", json);
            query.executeUpdate();
            entityManager.getTransaction().commit();
        } catch (Exception e) {
            entityManager.getTransaction().rollback();
            throw new RuntimeException(e);
        }
    }

    private void update(Document object, Document update, String databaseName, String collectionName) {
        entityManager.getTransaction().begin();
        try {
            String json = JSON.serialize(object);
            String updateJson = JSON.serialize(update);
            Query query = entityManager.createNativeQuery("UPDATE " + collectionName + " SET json = :json WHERE json = :updateJson");
            query.setParameter("json", json);
            query.setParameter("updateJson", updateJson);
            query.executeUpdate();
            entityManager.getTransaction().commit();
        } catch (Exception e) {
            entityManager.getTransaction().rollback();
            throw new RuntimeException(e);
        }
    }

    private void delete(Document object, String databaseName, String collectionName) {
        entityManager.getTransaction().begin();
        try {
            String json = JSON.serialize(object);
            Query query = entityManager.createNativeQuery("DELETE FROM " + collectionName + " WHERE json = :json");
            query.setParameter("json", json);
            query.executeUpdate();
            entityManager.getTransaction().commit();
        } catch (Exception e) {
            entityManager.getTransaction().rollback();
            throw new RuntimeException(e);
        }
    }
}

原文地址:https://blog.csdn.net/qq_42702751/article/details/134734523

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_24958.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注