当前位置: 首页 > article >正文

Mongo数据库 --- Mongo Pipeline

Mongo数据库 --- Mongo Pipeline

  • 什么是Mongo Pipeline
  • Mongo Pipeline常用的几个Stage
  • Explanation with example:
    • MongoDB $match
    • MongoDB $project
    • MongoDB $group
    • MongoDB $unwind
    • MongoDB $count
    • MongoDB $addFields
  • Some Query Examples
  • 在C#中使用Aggreagtion Pipeline
    • **方法一: 使用RawBsonDocument**
    • 使用 Fluent API

什么是Mongo Pipeline

  • 在MongoDB中,聚合管道(aggregation pipeline)是一种用于处理和转换数据的机制。它允许您按顺序对集合中的文档执行一系列操作,并将结果从一个阶段传递到下一个阶段。聚合管道由多个阶段组成,每个阶段在输入文档上执行特定的操作,并生成转换后的输出,作为下一个阶段的输入。
  • 聚合管道中的每个阶段接收输入文档,并应用某种操作,例如过滤、分组、投影或排序,以生成一组修改后的文档。一个阶段的输出成为下一个阶段的输入,形成了一个数据处理的管道流程.
  • 聚合管道提供了一种强大而灵活的方式来执行复杂的数据处理任务,例如过滤、分组、排序、连接和聚合数据,所有这些都可以在MongoDB查询框架内完成。它能够高效地处理大型数据集,并提供了一种方便的方式来在返回结果之前对数据进行形状和操作
  • MongoDB的聚合管道(aggregation pipeline)是在MongoDB服务器上执行的。聚合管道操作在服务器端进行,而不是将数据取出并在本地内存中执行

Mongo Pipeline常用的几个Stage

  • $match:根据指定的条件筛选文档,类似于查询中的find操作。可以使用各种条件和表达式进行匹配。
  • $project:重新塑造文档结构,包括选择特定字段、排除字段、创建计算字段、重命名字段等。还可以使用表达式进行计算和转换。
  • $group:按照指定的键对文档进行分组,并对每个组执行聚合操作,如计算总和、平均值、计数等。可以进行多字段分组和多个聚合操作。
  • $sort:根据指定的字段对文档进行排序,可以指定升序或降序排序。
  • $limit:限制返回结果的文档数量,只保留指定数量的文档。
  • $skip:跳过指定数量的文档,返回剩余的文档。
  • $unwind:将包含数组的字段拆分为多个文档,每个文档包含数组中的一个元素。这在对数组字段进行聚合操作时很有用。
  • $lookup:执行左连接操作,将当前集合中的文档与其他集合中的文档进行关联。可以根据匹配条件将相关文档合并到结果中

Explanation with example:

Example使用以下数据

//University Collection
{
  country : 'Spain',
  city : 'Salamanca',
  name : 'USAL',
  location : {
    type : 'Point',
    coordinates : [ -5.6722512,17, 40.9607792 ]
  },
  students : [
    { year : 2014, number : 24774 },
    { year : 2015, number : 23166 },
    { year : 2016, number : 21913 },
    { year : 2017, number : 21715 }
  ]
}

{
  country : 'Spain',
  city : 'Salamanca',
  name : 'UPSA',
  location : {
    type : 'Point',
    coordinates : [ -5.6691191,17, 40.9631732 ]
  },
  students : [
    { year : 2014, number : 4788 },
    { year : 2015, number : 4821 },
    { year : 2016, number : 6550 },
    { year : 2017, number : 6125 }
  ]
}
//Course Collection
{
  university : 'USAL',
  name : 'Computer Science',
  level : 'Excellent'
}
{
  university : 'USAL',
  name : 'Electronics',
  level : 'Intermediate'
}
{
  university : 'USAL',
  name : 'Communication',
  level : 'Excellent'
}

MongoDB $match

  • 根据指定的条件筛选文档,类似于查询中的find操作。可以使用各种条件和表达式进行匹配。
db.universities.aggregate([
  { $match : { country : 'Spain', city : 'Salamanca' } }
]).pretty()

Output:

{
  country : 'Spain',
  city : 'Salamanca',
  name : 'USAL',
  location : {
    type : 'Point',
    coordinates : [ -5.6722512,17, 40.9607792 ]
  },
  students : [
    { year : 2014, number : 24774 },
    { year : 2015, number : 23166 },
    { year : 2016, number : 21913 },
    { year : 2017, number : 21715 }
  ]
}

{
  country : 'Spain',
  city : 'Salamanca',
  name : 'UPSA',
  location : {
    type : 'Point',
    coordinates : [ -5.6691191,17, 40.9631732 ]
  },
  students : [
    { year : 2014, number : 4788 },
    { year : 2015, number : 4821 },
    { year : 2016, number : 6550 },
    { year : 2017, number : 6125 }
  ]
}

MongoDB $project

  • 重新塑造文档结构,包括选择特定字段、排除字段、创建计算字段、重命名字段等。还可以使用表达式进行计算和转换
  • In the code that follows, please note that:
  • We must explicitly write _id : 0 when this field is not required
  • Apart from the _id field, it is sufficient to specify only those fields we need to obtain as a result of the query
db.universities.aggregate([
  { $project : { _id : 0, country : 1, city : 1, name : 1 } }
]).pretty()

Output:

{ "country" : "Spain", "city" : "Salamanca", "name" : "USAL" }
{ "country" : "Spain", "city" : "Salamanca", "name" : "UPSA" }

MongoDB $group

  • 按照指定的键对文档进行分组,并对每个组执行聚合操作,如计算总和、平均值、计数等。可以进行多字段分组和多个聚合操作。
//$sum : 1 的意思是计算每个分组中document的数量,$sum : 2 则是doccument数量的两倍
db.universities.aggregate([
  { $group : { _id : '$name', totaldocs : { $sum : 1 } } }
]).pretty()

Output:

{ "_id" : "UPSA", "totaldocs" : 1 }
{ "_id" : "USAL", "totaldocs" : 1 }
  • $group支持的operator
  • $count: Calculates the quantity of documents in the given group.
  • $max Displays the maximum value of a document’s field in the collection.
  • $min Displays the minimum value of a document’s field in the collection.
  • $avg Displays the average value of a document’s field in the collection.
  • $sum Sums up the specified values of all documents in the collection.
  • $push Adds extra values into the array of the resulting document.

MongoDB $unwind

  • 将包含数组的字段拆分为多个文档,每个文档包含数组中的一个元素。这在对数组字段进行聚合操作时很有用
db.universities.aggregate([
  { $match : { name : 'USAL' } },
  { $unwind : '$students' }
]).pretty()
  • unwind students会把每个student记录提取出来和剩下的属性拼起来变成一个独立的dcoument

Input

{
  country : 'Spain',
  city : 'Salamanca',
  name : 'USAL',
  location : {
    type : 'Point',
    coordinates : [ -5.6722512,17, 40.9607792 ]
  },
  students : [
    { year : 2014, number : 24774 },
    { year : 2015, number : 23166 },
  ]
}

Output:

{
	"_id" : ObjectId("5b7d9d9efbc9884f689cdba9"),
	"country" : "Spain",
	"city" : "Salamanca",
	"name" : "USAL",
	"location" : {
		"type" : "Point",
		"coordinates" : [-5.6722512,17,40.9607792]
	},
	"students" : {"year" : 2014, "number" : 24774}
}
{
	"_id" : ObjectId("5b7d9d9efbc9884f689cdba9"),
	"country" : "Spain",
	"city" : "Salamanca",
	"name" : "USAL",
	"location" : {
		"type" : "Point",
		"coordinates" : [-5.6722512,17,40.9607792]
	},
	"students" : {"year" : 2015,"number" : 23166}
}

MongoDB $count

  • The $count stage provides an easy way to check the number of documents obtained in the output of the previous stages of the pipeline
db.universities.aggregate([
  { $unwind : '$students' },
  { $count : 'total_documents' }
]).pretty()

Output:

{ "total_documents" : 8 }

MongoDB $addFields

  • It is possible that you need to make some changes to your output in the way of new fields. In the next example, we want to add the year of the foundation of the university.
  • addField 不会修改数据库
db.universities.aggregate([
  { $match : { name : 'USAL' } },
  { $addFields : { foundation_year : 1218 } }
]).pretty()
{
	"_id" : ObjectId("5b7d9d9efbc9884f689cdba9"),
	"country" : "Spain",
	"city" : "Salamanca",
	"name" : "USAL",
	"location" : {
		"type" : "Point",
		"coordinates" : [-5.6722512,17,40.9607792]
	},
	"students" : [
			{"year" : 2014,"number" : 24774},
			{"year" : 2015,"number" : 23166},
			{"year" : 2016,"number" : 21913},
			{"year" : 2017,"number" : 21715}
	],
	"foundation_year" : 1218
}

Some Query Examples

db.collection.aggregate([
  { $match: { age: { $gt: 30 } } }
])
db.collection.aggregate([
  { $group: { _id: "$category", total: { $sum: "$quantity" } } }
])
//This example selects the "name" field and creates a new field called "fullName" 
//by concatenating the "firstName" and "lastName" fields.
db.collection.aggregate([
  { $project: { _id: 0, name: 1, fullName: { $concat: ["$firstName", " ", "$lastName"] } } }
])
//This example sorts documents in descending order based on the "age" field.
db.collection.aggregate([
  { $sort: { age: -1 } }
])
//This example limits the output to only the first 10 documents.
db.collection.aggregate([
  { $limit: 10 }
])

在C#中使用Aggreagtion Pipeline

方法一: 使用RawBsonDocument

BsonDocument pipelineStage1 = new BsonDocument{
    {
        "$match", new BsonDocument{
            { "username", "nraboy" }
        }
    }
};

BsonDocument pipelineStage2 = new BsonDocument{
    { 
        "$project", new BsonDocument{
            { "_id", 1 },
            { "username", 1 },
            { 
                "items", new BsonDocument{
                    {
                        "$map", new BsonDocument{
                            { "input", "$items" },
                            { "as", "item" },
                            {
                                "in", new BsonDocument{
                                    {
                                        "$convert", new BsonDocument{
                                            { "input", "$$item" },
                                            { "to", "objectId" }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
};

BsonDocument pipelineStage3 = new BsonDocument{
    {
        "$lookup", new BsonDocument{
            { "from", "movies" },
            { "localField", "items" },
            { "foreignField", "_id" },
            { "as", "movies" }
        }
    }
};

BsonDocument pipelineStage4 = new BsonDocument{
    { "$unwind", "$movies" }
};

BsonDocument pipelineStage5 = new BsonDocument{
    {
        "$group", new BsonDocument{
            { "_id", "$_id" },
            { 
                "username", new BsonDocument{
                    { "$first", "$username" }
                } 
            },
            { 
                "movies", new BsonDocument{
                    { "$addToSet", "$movies" }
                }
            }
        }
    }
};

BsonDocument[] pipeline = new BsonDocument[] { 
    pipelineStage1, 
    pipelineStage2, 
    pipelineStage3, 
    pipelineStage4, 
    pipelineStage5 
};

List<BsonDocument> pResults = playlistCollection.Aggregate<BsonDocument>(pipeline).ToList();

foreach(BsonDocument pResult in pResults) {
    Console.WriteLine(pResult);
}

使用 Fluent API

var pResults = playlistCollection.Aggregate()
    .Match(new BsonDocument{{ "username", "nraboy" }})
    .Project(new BsonDocument{
            { "_id", 1 },
            { "username", 1 },
            {
                "items", new BsonDocument{
                    {
                        "$map", new BsonDocument{
                            { "input", "$items" },
                            { "as", "item" },
                            {
                                "in", new BsonDocument{
                                    {
                                        "$convert", new BsonDocument{
                                            { "input", "$$item" },
                                            { "to", "objectId" }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        })
    .Lookup("movies", "items", "_id", "movies")
    .Unwind("movies")
    .Group(new BsonDocument{
            { "_id", "$_id" },
            {
                "username", new BsonDocument{
                    { "$first", "$username" }
                }
            },
            {
                "movies", new BsonDocument{
                    { "$addToSet", "$movies" }
                }
            }
        })
    .ToList();

foreach(var pResult in pResults) {
    Console.WriteLine(pResult);
}

http://www.kler.cn/a/411642.html

相关文章:

  • [STM32]从零开始的STM32 FreeRTOS移植教程
  • 异步编程中,为什么必须将conn放到后台连接
  • 星火一号开发板总结
  • 【04】Selenium+Python 手动添加Cookie免登录(实例)
  • ️ 爬虫开发中常见的性能优化策略有哪些?
  • Delphi ADO组件中的 ADOTable、ADOQurey 无SQL语句实现增、删、改、查
  • 量子安全与经典密码学:一些现实方面的讨论
  • 分布式在线评测系统
  • 【机器视觉 OCR】适合Python开发的OCR工具:深入解析与实战应用
  • Python学习34天
  • 在Unity中实现物体动画的完整流程
  • HTTP 管道传输与多路复用
  • 数据结构与算法学习笔记----队列
  • 大数据面试SQL题-笔记02【查询、连接、聚合函数】
  • 大语言模型---Llama不同系列的权重参数文件提取;Llama-7B权重文件提取;Llama-8B权重文件提取;主要代码功能解析
  • (已解决)wps无法加载此加载项程序mathpage.wll
  • 音视频技术扫盲之预测编码的基本原理探究
  • 基于Matlab扩展卡尔曼滤波的主从导航系统传递对准仿真与优化研究
  • SpringBoot(三十九)SpringBoot集成RabbitMQ实现流量削峰添谷
  • Oracle 深入学习 Part 9: Storage Structure and Relationships(存储结构与关系)
  • 音视频相关的一些基本概念
  • 前后端分离,后端拦截器无法获得前端请求的token
  • 快速理解微服务中Ribbon的概念
  • 01.Django快速入门
  • Redis核心类型----有序集合
  • 案例分析:嵌入式边缘计算机ARMxy在工商储能柜新能源应用