当前位置: 首页 > article >正文

从变更到通知:使用Python和MongoDB Change Streams实现即时事件监听

MongoDB提供了一种强大的功能,称为Change Streams,它允许应用程序监听数据库中的变更事件,并在数据发生变化时立即做出响应。这在mysql数据库是不具备没有这个功能的。又如:我们在支付环节想一直监听支付回调的状态,就必须定时循环。但是用到Change Streams监听功能,就不要低效率的定时循环做法。
在本文中,我们将探讨如何在Python中使用MongoDB的Change Streams来消费变更事件。
在这里插入图片描述

什么是MongoDB Change Streams?
MongoDB Change Streams是一种实时监听数据库集合变更的能力。它们可以用来监听以下类型的事件:

插入(insert)
更新(update)
删除(delete)
更多…
Change Streams可以在应用层提供数据变更的实时通知,这对于需要即时更新用户界面、触发业务流程或同步数据到其他服务的应用程序非常有用。

为什么使用Change Streams?
使用Change Streams而不是轮询机制有以下几个优势:

性能:轮询会不断查询数据库,这可能会导致不必要的负载。Change Streams仅在数据变更时提供通知,从而减少不必要的数据库交互。
实时性:Change Streams提供近乎实时的数据变更通知,这对于需要快速响应的应用程序至关重要。
可靠性:Change Streams保证不会错过任何变更事件,即使在应用程序重启之后也能从最后的位置继续监听。
如何在Python中使用Change Streams?
要在Python中使用Change Streams,您需要使用pymongo库,这是MongoDB的官方Python驱动程序。以下是如何设置和使用Change Streams的基本步骤:

要配置MongoDB的Change Streams以监控特定类型的变更,你可以使用MongoDB的聚合管道(Aggregation Pipeline)功能。通过提供一个或多个管道阶段的数组,你可以控制Change Streams的输出。

##初始化副本集
连接到任何一个MongoDB实例(通常是配置文件中指定的第一个实例),使用MongoDB Shell来初始化副本集:

mongosh   #mongo的命令行
use admin
rs.initiate({
  _id: "myReplicaSet",   #副本集名称
  members: [
    { _id: 0, host: "host1:27017" },  #host1要换成对应的公网ip才能访问,除非其它情况
    { _id: 1, host: "host2:27017" },
    { _id: 2, host: "host3:27017" }
  ]
})

在这里,host1:27017、host2:27017和host3:27017应该替换为您的MongoDB实例的实际IP地址和端口。_id是一个从0开始的唯一标识符,用于区分不同的成员。

在宝塔内如何处理
安装好mongdoDB
在这里插入图片描述
本地检查:
在这里插入图片描述
加入配置:
目录:
root@iZ2ze50t4ys98i5408heezZ:/www/server/mongodb/bin#

生成配置内

  • keyFile:
    (base) root@VM-4-7-ubuntu:/www/server/mongodb#
    openssl rand -base64 756 > /www/server/mongodb/keyfile
    chmod 400 /www/server/mongodb/keyfile
  • replication:
    replSetName: rs0 # rs0跟命令行创建的id名要一致

在下图加入配置
在这里插入图片描述

在mongdb命令行创建副本

在这里插入图片描述
MongoDB Shell支持命令历史记录功能,您可以使用上下箭头键来检索在Shell中发出的先前命令。

mondosh
rs0 [direct: primary] test> use admin
switched to db admin
rs0 [direct: primary] admin> db.auth("root","ubVvGqDmL7UAdwkG")
{ ok: 1 }
rs0 [direct: primary] admin>
##配置复制xx
var currentConfig = rs.conf();
var newConfig = { _id: "rs0", version: currentConfig.version + 1, members: [ { _id: 0, host: "8.152.219.111:27017" }] };
rs0 [direct: primary] admin> rs.reconfig(newConfig);
{
ok: 1,
'$clusterTime': {
clusterTime: Timestamp({ t: 1733560367, i: 1 }),
signature: {
hash: Binary.createFromBase64('DhedFtGMLmVNqwW7/0tGP91vKoA=', 0),
keyId: Long('7445547217475076102')
}
},
operationTime: Timestamp({ t: 1733560367, i: 1 })
}
rs0 [direct: primary] admin> rs.status() #的输出

在这里插入图片描述

然后重启,先kill -9 端口号,启动mongod命令

##使用客户端2种方式测试连接是否成功
在这里插入图片描述
在这里插入图片描述

以上如果测试通过,表示成功了。

使用代码测试:

monitor_mongo_changes.py

from pymongo import MongoClient
from pymongo.change_stream import ChangeStream

def monitor_mongo_changes(collection_name, database_name):
    """
    监控MongoDB文档变更的函数。

    :param collection_name: 要监控的MongoDB集合名称
    :param database_name: 要监控的MongoDB数据库名称
    """
    # 连接到MongoDB
    url = "mongodb://root:ub***G@8.7.0.1:27017/esg?authSource=admin&replicaSet=rs0"
    client = MongoClient(url,
                         serverSelectionTimeoutMS=10000)  # 设置超时时间为10秒


    # 选择数据库和集合
    db = client[database_name]
    collection = db[collection_name]

    # 创建一个管道,用于匹配特定条件的变更
    # pipeline = [{'$match': {'operationType': 'update'}}]
    pipeline = [
        {'$match': {'operationType': {'$in': ['insert', 'update','delete']}}}
    ]
    # 创建Change Stream并传入管道
    change_stream = collection.watch(pipeline)

    # 监听Change Stream
    for change in change_stream:
        print("Change detected:", change)
    print("end ....")

# 使用函数
monitor_mongo_changes('gress', 'esg')

运行python monitor_mongo_changes.py ,它不会自动退出,一直会监听mongdb数据库esg数据集的修改、插入、删除动作的。

在这里插入图片描述

用到的命令集

启动命令:mongod -f /www/server/mongodb/config.conf
查看启动状态:/etc/init.d/mongodb status
查看进程:ps -ef | grep mongo
mongosh    #进入mongdo的cli环境

http://www.kler.cn/a/430137.html

相关文章:

  • 《解锁图像的语言密码:Image Caption 开源神经网络项目全解析》
  • 备忘录记事工具 四款好用的电脑备忘录记事本分享
  • 深入理解 C 语言中浮点型数据在内存中的存储
  • 数据结构——栈的实现
  • 基于vue的商城小程序的毕业设计与实现(源码及报告)
  • 【大数据基础】大数据概述
  • 后端-pageHelp分页查询
  • synchronized的特性
  • 零基础微信小程序开发——小程序的宿主环境(保姆级教程+超详细)
  • 【日常记录-Git】git fetch
  • 河南师范大学在线评测系统(HTUOJ)正式上线啦!!!
  • 基于Pyhton的人脸识别(Python 3.12+face_recognition库)
  • ragflow连ollama时出现的Bug
  • Charts 教程:创建交互式图表的基础
  • 面试经典150题刷题——双指针部分
  • java+ssm+mysql房屋租赁管理系统
  • 页面置换算法模拟 最近最久未使用(LRU)算法
  • 数据结构第一弹-平衡树
  • leetcode_LCP 07
  • 现代C++ 21 any
  • 《筑牢网络安全防线:守护数字时代的生命线》
  • 阿里云ack部署rabbitmq集群
  • 网络原理之 TCP 协议
  • 启动hbase后没有hmaster进程
  • 二一(GIT4)、echarts(地图)、黑马就业数据平台(学生页-增 删 改)
  • 【论文阅读】Fifty Years of the ISCA: A Data-Driven Retrospective