MongoDB:如何将多个集合中的数据合并为一个?

我如何(在MongoDB)结合数据从多个集合到一个集合?

我可以使用地图减少,如果是,然后如何?

我非常感谢一些例子,因为我是一个新手。

434573 次浏览

你必须在你的应用层做这些。如果您正在使用ORM,它可以使用注释(或类似的东西)来提取存在于其他集合中的引用。我只使用过吗啡,并且@Reference注释在查询时获取引用的实体,因此我能够避免自己在代码中这样做。

虽然不能实时执行,但可以多次运行map-reduce,使用MongoDB 1.8+ map/reduce中的"reduce" out选项将数据合并在一起(参见http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Outputoptions)。您需要在两个集合中都有一些可以用作_id的键。

例如,假设你有一个users集合和一个comments集合,你想要有一个新的集合,其中每个评论都有一些用户统计信息。

让我们说users集合有以下字段:

  • _id
  • firstName
  • 国家
  • 性别
  • 年龄

然后comments集合有以下字段:

  • _id
  • 用户标识
  • 评论
  • 创建

你可以这样做:

var mapUsers, mapComments, reduce;
db.users_comments.remove();


// setup sample data - wouldn't actually use this in production
db.users.remove();
db.comments.remove();
db.users.save({firstName:"Rich",lastName:"S",gender:"M",country:"CA",age:"18"});
db.users.save({firstName:"Rob",lastName:"M",gender:"M",country:"US",age:"25"});
db.users.save({firstName:"Sarah",lastName:"T",gender:"F",country:"US",age:"13"});
var users = db.users.find();
db.comments.save({userId: users[0]._id, "comment": "Hey, what's up?", created: new ISODate()});
db.comments.save({userId: users[1]._id, "comment": "Not much", created: new ISODate()});
db.comments.save({userId: users[0]._id, "comment": "Cool", created: new ISODate()});
// end sample data setup


mapUsers = function() {
var values = {
country: this.country,
gender: this.gender,
age: this.age
};
emit(this._id, values);
};
mapComments = function() {
var values = {
commentId: this._id,
comment: this.comment,
created: this.created
};
emit(this.userId, values);
};
reduce = function(k, values) {
var result = {}, commentFields = {
"commentId": '',
"comment": '',
"created": ''
};
values.forEach(function(value) {
var field;
if ("comment" in value) {
if (!("comments" in result)) {
result.comments = [];
}
result.comments.push(value);
} else if ("comments" in value) {
if (!("comments" in result)) {
result.comments = [];
}
result.comments.push.apply(result.comments, value.comments);
}
for (field in value) {
if (value.hasOwnProperty(field) && !(field in commentFields)) {
result[field] = value[field];
}
}
});
return result;
};
db.users.mapReduce(mapUsers, reduce, {"out": {"reduce": "users_comments"}});
db.comments.mapReduce(mapComments, reduce, {"out": {"reduce": "users_comments"}});
db.users_comments.find().pretty(); // see the resulting collection

此时,你将拥有一个名为users_comments的新集合,其中包含合并后的数据,你现在可以使用它了。这些简化的集合都有_id,这是你在map函数中发出的键,然后所有的值都是value键内的子对象-这些值不在这些简化文档的顶层。

这是一个比较简单的例子。您可以重复使用更多的集合,只要您想继续构建减少的集合。您还可以在该过程中对数据进行总结和聚合。随着聚合和保存现有字段的逻辑变得更加复杂,您可能会定义多个reduce函数。

您还会注意到,现在每个用户都有一个文档,数组中包含该用户的所有评论。如果我们合并的数据是一对一的关系,而不是一对多的关系,它将是平坦的,你可以简单地使用这样的reduce函数:

reduce = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};

如果你想平铺users_comments集合,使它是一个文档每个注释,另外运行这个:

var map, reduce;
map = function() {
var debug = function(value) {
var field;
for (field in value) {
print(field + ": " + value[field]);
}
};
debug(this);
var that = this;
if ("comments" in this.value) {
this.value.comments.forEach(function(value) {
emit(value.commentId, {
userId: that._id,
country: that.value.country,
age: that.value.age,
comment: value.comment,
created: value.created,
});
});
}
};
reduce = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
db.users_comments.mapReduce(map, reduce, {"out": "comments_with_demographics"});

这个技巧绝对不应该在飞行中执行。它适用于定期更新合并数据的cron作业或类似的工作。你可能想在新集合上运行ensureIndex,以确保你对它执行的查询能够快速运行(请记住,你的数据仍然在value键中,所以如果你要在注释created时间上索引comments_with_demographics,它将是db.comments_with_demographics.ensureIndex({"value.created": 1});

代码片段。礼貌-关于堆栈溢出的多个帖子,包括这一篇。

 db.cust.drop();
db.zip.drop();
db.cust.insert({cust_id:1, zip_id: 101});
db.cust.insert({cust_id:2, zip_id: 101});
db.cust.insert({cust_id:3, zip_id: 101});
db.cust.insert({cust_id:4, zip_id: 102});
db.cust.insert({cust_id:5, zip_id: 102});


db.zip.insert({zip_id:101, zip_cd:'AAA'});
db.zip.insert({zip_id:102, zip_cd:'BBB'});
db.zip.insert({zip_id:103, zip_cd:'CCC'});


mapCust = function() {
var values = {
cust_id: this.cust_id
};
emit(this.zip_id, values);
};


mapZip = function() {
var values = {
zip_cd: this.zip_cd
};
emit(this.zip_id, values);
};


reduceCustZip =  function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
if ("cust_id" in value) {
if (!("cust_ids" in result)) {
result.cust_ids = [];
}
result.cust_ids.push(value);
} else {
for (field in value) {
if (value.hasOwnProperty(field) ) {
result[field] = value[field];
}
};
}
});
return result;
};




db.cust_zip.drop();
db.cust.mapReduce(mapCust, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.zip.mapReduce(mapZip, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.cust_zip.find();




mapCZ = function() {
var that = this;
if ("cust_ids" in this.value) {
this.value.cust_ids.forEach(function(value) {
emit(value.cust_id, {
zip_id: that._id,
zip_cd: that.value.zip_cd
});
});
}
};


reduceCZ = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
db.cust_zip_joined.drop();
db.cust_zip.mapReduce(mapCZ, reduceCZ, {"out": "cust_zip_joined"});
db.cust_zip_joined.find().pretty();




var flattenMRCollection=function(dbName,collectionName) {
var collection=db.getSiblingDB(dbName)[collectionName];


var i=0;
var bulk=collection.initializeUnorderedBulkOp();
collection.find({ value: { $exists: true } }).addOption(16).forEach(function(result) {
print((++i));
//collection.update({_id: result._id},result.value);


bulk.find({_id: result._id}).replaceOne(result.value);


if(i%1000==0)
{
print("Executing bulk...");
bulk.execute();
bulk=collection.initializeUnorderedBulkOp();
}
});
bulk.execute();
};




flattenMRCollection("mydb","cust_zip_joined");
db.cust_zip_joined.find().pretty();

Mongorestore有这样一个特性,即在数据库中已经存在的数据之上追加数据,所以这个行为可以用于组合两个集合:

  1. mongodump文物
  2. collection2.rename(文物)
  3. mongorestore

还没有尝试过,但它可能比map/reduce方法执行得更快。

如果mongodb中没有批量插入,我们循环small_collection中的所有对象,并将它们逐个插入到big_collection中:

db.small_collection.find().forEach(function(obj){
db.big_collection.insert(obj)
});

是的,你可以,拿我今天写的这个效用函数来说

function shangMergeCol() {
tcol= db.getCollection(arguments[0]);
for (var i=1; i<arguments.length; i++){
scol= db.getCollection(arguments[i]);
scol.find().forEach(
function (d) {
tcol.insert(d);
}
)
}
}

你可以传递给这个函数任意数量的集合,第一个将是目标集合。其余所有集合都是要传输到目标集合的源。

MongoDB 3.2现在允许通过$lookup聚合阶段将多个集合中的数据合并到一个集合中。作为一个实际的例子,假设您有关于书籍的数据,分为两个不同的集合。

第一个集合,称为books,具有以下数据:

{
"isbn": "978-3-16-148410-0",
"title": "Some cool book",
"author": "John Doe"
}
{
"isbn": "978-3-16-148999-9",
"title": "Another awesome book",
"author": "Jane Roe"
}

第二个集合名为books_selling_data,包含以下数据:

{
"_id": ObjectId("56e31bcf76cdf52e541d9d26"),
"isbn": "978-3-16-148410-0",
"copies_sold": 12500
}
{
"_id": ObjectId("56e31ce076cdf52e541d9d28"),
"isbn": "978-3-16-148999-9",
"copies_sold": 720050
}
{
"_id": ObjectId("56e31ce076cdf52e541d9d29"),
"isbn": "978-3-16-148999-9",
"copies_sold": 1000
}

要合并这两个集合,只需按以下方式使用$lookup:

db.books.aggregate([{
$lookup: {
from: "books_selling_data",
localField: "isbn",
foreignField: "isbn",
as: "copies_sold"
}
}])

在此聚合之后,books集合将看起来如下所示:

{
"isbn": "978-3-16-148410-0",
"title": "Some cool book",
"author": "John Doe",
"copies_sold": [
{
"_id": ObjectId("56e31bcf76cdf52e541d9d26"),
"isbn": "978-3-16-148410-0",
"copies_sold": 12500
}
]
}
{
"isbn": "978-3-16-148999-9",
"title": "Another awesome book",
"author": "Jane Roe",
"copies_sold": [
{
"_id": ObjectId("56e31ce076cdf52e541d9d28"),
"isbn": "978-3-16-148999-9",
"copies_sold": 720050
},
{
"_id": ObjectId("56e31ce076cdf52e541d9d28"),
"isbn": "978-3-16-148999-9",
"copies_sold": 1000
}
]
}

有几件事值得注意:

  1. "from"集合,在本例中为books_selling_data,不能被分片。
  2. “as”字段将是一个数组,如上面的例子所示。
  3. 美元的查找工作台中的"localField"和"foreignField"选项如果在各自的集合中不存在,出于匹配目的将被视为空(美元查找文档中有一个完美的例子)。

因此,作为一个结论,如果你想合并两个集合,在这种情况下,有一个平坦的copyes_sold字段和售出的总副本,你将不得不多做一些工作,可能会使用一个中间集合,然后从美元了到最终集合。

非常基本的$lookup示例。

db.getCollection('users').aggregate([
{
$lookup: {
from: "userinfo",
localField: "userId",
foreignField: "userId",
as: "userInfoData"
}
},
{
$lookup: {
from: "userrole",
localField: "userId",
foreignField: "userId",
as: "userRoleData"
}
},
{ $unwind: { path: "$userInfoData", preserveNullAndEmptyArrays: true }},
{ $unwind: { path: "$userRoleData", preserveNullAndEmptyArrays: true }}
])

这里用到了

 { $unwind: { path: "$userInfoData", preserveNullAndEmptyArrays: true }},
{ $unwind: { path: "$userRoleData", preserveNullAndEmptyArrays: true }}

而不是

{ $unwind:"$userRoleData"}
{ $unwind:"$userRoleData"}

因为{$放松:“$ userRoleData "}将返回空或0结果,如果没有匹配的记录找到$lookup。

在聚合中为多个集合使用多个美元的查找

查询:

db.getCollection('servicelocations').aggregate([
{
$match: {
serviceLocationId: {
$in: ["36728"]
}
}
},
{
$lookup: {
from: "orders",
localField: "serviceLocationId",
foreignField: "serviceLocationId",
as: "orders"
}
},
{
$lookup: {
from: "timewindowtypes",
localField: "timeWindow.timeWindowTypeId",
foreignField: "timeWindowTypeId",
as: "timeWindow"
}
},
{
$lookup: {
from: "servicetimetypes",
localField: "serviceTimeTypeId",
foreignField: "serviceTimeTypeId",
as: "serviceTime"
}
},
{
$unwind: "$orders"
},
{
$unwind: "$serviceTime"
},
{
$limit: 14
}
])

结果:

{
"_id" : ObjectId("59c3ac4bb7799c90ebb3279b"),
"serviceLocationId" : "36728",
"regionId" : 1.0,
"zoneId" : "DXBZONE1",
"description" : "AL HALLAB REST EMIRATES MALL",
"locationPriority" : 1.0,
"accountTypeId" : 1.0,
"locationType" : "SERVICELOCATION",
"location" : {
"makani" : "",
"lat" : 25.119035,
"lng" : 55.198694
},
"deliveryDays" : "MTWRFSU",
"timeWindow" : [
{
"_id" : ObjectId("59c3b0a3b7799c90ebb32cde"),
"timeWindowTypeId" : "1",
"Description" : "MORNING",
"timeWindow" : {
"openTime" : "06:00",
"closeTime" : "08:00"
},
"accountId" : 1.0
},
{
"_id" : ObjectId("59c3b0a3b7799c90ebb32cdf"),
"timeWindowTypeId" : "1",
"Description" : "MORNING",
"timeWindow" : {
"openTime" : "09:00",
"closeTime" : "10:00"
},
"accountId" : 1.0
},
{
"_id" : ObjectId("59c3b0a3b7799c90ebb32ce0"),
"timeWindowTypeId" : "1",
"Description" : "MORNING",
"timeWindow" : {
"openTime" : "10:30",
"closeTime" : "11:30"
},
"accountId" : 1.0
}
],
"address1" : "",
"address2" : "",
"phone" : "",
"city" : "",
"county" : "",
"state" : "",
"country" : "",
"zipcode" : "",
"imageUrl" : "",
"contact" : {
"name" : "",
"email" : ""
},
"status" : "ACTIVE",
"createdBy" : "",
"updatedBy" : "",
"updateDate" : "",
"accountId" : 1.0,
"serviceTimeTypeId" : "1",
"orders" : [
{
"_id" : ObjectId("59c3b291f251c77f15790f92"),
"orderId" : "AQ18O1704264",
"serviceLocationId" : "36728",
"orderNo" : "AQ18O1704264",
"orderDate" : "18-Sep-17",
"description" : "AQ18O1704264",
"serviceType" : "Delivery",
"orderSource" : "Import",
"takenBy" : "KARIM",
"plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
"plannedDeliveryTime" : "",
"actualDeliveryDate" : "",
"actualDeliveryTime" : "",
"deliveredBy" : "",
"size1" : 296.0,
"size2" : 3573.355,
"size3" : 240.811,
"jobPriority" : 1.0,
"cancelReason" : "",
"cancelDate" : "",
"cancelBy" : "",
"reasonCode" : "",
"reasonText" : "",
"status" : "",
"lineItems" : [
{
"ItemId" : "BNWB020",
"size1" : 15.0,
"size2" : 78.6,
"size3" : 6.0
},
{
"ItemId" : "BNWB021",
"size1" : 20.0,
"size2" : 252.0,
"size3" : 11.538
},
{
"ItemId" : "BNWB023",
"size1" : 15.0,
"size2" : 285.0,
"size3" : 16.071
},
{
"ItemId" : "CPMW112",
"size1" : 3.0,
"size2" : 25.38,
"size3" : 1.731
},
{
"ItemId" : "MMGW001",
"size1" : 25.0,
"size2" : 464.375,
"size3" : 46.875
},
{
"ItemId" : "MMNB218",
"size1" : 50.0,
"size2" : 920.0,
"size3" : 60.0
},
{
"ItemId" : "MMNB219",
"size1" : 50.0,
"size2" : 630.0,
"size3" : 40.0
},
{
"ItemId" : "MMNB220",
"size1" : 50.0,
"size2" : 416.0,
"size3" : 28.846
},
{
"ItemId" : "MMNB270",
"size1" : 50.0,
"size2" : 262.0,
"size3" : 20.0
},
{
"ItemId" : "MMNB302",
"size1" : 15.0,
"size2" : 195.0,
"size3" : 6.0
},
{
"ItemId" : "MMNB373",
"size1" : 3.0,
"size2" : 45.0,
"size3" : 3.75
}
],
"accountId" : 1.0
},
{
"_id" : ObjectId("59c3b291f251c77f15790f9d"),
"orderId" : "AQ137O1701240",
"serviceLocationId" : "36728",
"orderNo" : "AQ137O1701240",
"orderDate" : "18-Sep-17",
"description" : "AQ137O1701240",
"serviceType" : "Delivery",
"orderSource" : "Import",
"takenBy" : "KARIM",
"plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
"plannedDeliveryTime" : "",
"actualDeliveryDate" : "",
"actualDeliveryTime" : "",
"deliveredBy" : "",
"size1" : 28.0,
"size2" : 520.11,
"size3" : 52.5,
"jobPriority" : 1.0,
"cancelReason" : "",
"cancelDate" : "",
"cancelBy" : "",
"reasonCode" : "",
"reasonText" : "",
"status" : "",
"lineItems" : [
{
"ItemId" : "MMGW001",
"size1" : 25.0,
"size2" : 464.38,
"size3" : 46.875
},
{
"ItemId" : "MMGW001-F1",
"size1" : 3.0,
"size2" : 55.73,
"size3" : 5.625
}
],
"accountId" : 1.0
},
{
"_id" : ObjectId("59c3b291f251c77f15790fd8"),
"orderId" : "AQ110O1705036",
"serviceLocationId" : "36728",
"orderNo" : "AQ110O1705036",
"orderDate" : "18-Sep-17",
"description" : "AQ110O1705036",
"serviceType" : "Delivery",
"orderSource" : "Import",
"takenBy" : "KARIM",
"plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
"plannedDeliveryTime" : "",
"actualDeliveryDate" : "",
"actualDeliveryTime" : "",
"deliveredBy" : "",
"size1" : 60.0,
"size2" : 1046.0,
"size3" : 68.0,
"jobPriority" : 1.0,
"cancelReason" : "",
"cancelDate" : "",
"cancelBy" : "",
"reasonCode" : "",
"reasonText" : "",
"status" : "",
"lineItems" : [
{
"ItemId" : "MMNB218",
"size1" : 50.0,
"size2" : 920.0,
"size3" : 60.0
},
{
"ItemId" : "MMNB219",
"size1" : 10.0,
"size2" : 126.0,
"size3" : 8.0
}
],
"accountId" : 1.0
}
],
"serviceTime" : {
"_id" : ObjectId("59c3b07cb7799c90ebb32cdc"),
"serviceTimeTypeId" : "1",
"serviceTimeType" : "nohelper",
"description" : "",
"fixedTime" : 30.0,
"variableTime" : 0.0,
"accountId" : 1.0
}
}

在MongoDB中以“SQL UNION”的方式进行联合,可以在单个查询中使用聚合和查找。下面是我测试过的MongoDB 4.0的一个例子:

// Create employees data for testing the union.
db.getCollection('employees').insert({ name: "John", type: "employee", department: "sales" });
db.getCollection('employees').insert({ name: "Martha", type: "employee", department: "accounting" });
db.getCollection('employees').insert({ name: "Amy", type: "employee", department: "warehouse" });
db.getCollection('employees').insert({ name: "Mike", type: "employee", department: "warehouse"  });


// Create freelancers data for testing the union.
db.getCollection('freelancers').insert({ name: "Stephany", type: "freelancer", department: "accounting" });
db.getCollection('freelancers').insert({ name: "Martin", type: "freelancer", department: "sales" });
db.getCollection('freelancers').insert({ name: "Doug", type: "freelancer", department: "warehouse"  });
db.getCollection('freelancers').insert({ name: "Brenda", type: "freelancer", department: "sales"  });


// Here we do a union of the employees and freelancers using a single aggregation query.
db.getCollection('freelancers').aggregate( // 1. Use any collection containing at least one document.
[
{ $limit: 1 }, // 2. Keep only one document of the collection.
{ $project: { _id: '$$REMOVE' } }, // 3. Remove everything from the document.


// 4. Lookup collections to union together.
{ $lookup: { from: 'employees', pipeline: [{ $match: { department: 'sales' } }], as: 'employees' } },
{ $lookup: { from: 'freelancers', pipeline: [{ $match: { department: 'sales' } }], as: 'freelancers' } },


// 5. Union the collections together with a projection.
{ $project: { union: { $concatArrays: ["$employees", "$freelancers"] } } },


// 6. Unwind and replace root so you end up with a result set.
{ $unwind: '$union' },
{ $replaceRoot: { newRoot: '$union' } }
]);

下面是它的工作原理:

  1. 从数据库的any集合中实例化一个aggregate,其中至少有一个文档。如果你不能保证你的数据库的任何集合都不会是空的,你可以通过在你的数据库中创建某种“虚拟”集合来解决这个问题,其中包含一个专门用于联合查询的空文档。

  2. 使管道的第一阶段为{ $limit: 1 }。这将剥离集合中除第一个文档外的所有文档。

  3. 通过使用$project阶段剥离剩余文档的所有字段:

    { $project: { _id: '$$REMOVE' } }
    
  4. Your aggregate now contains a single, empty document. It's time to add lookups for each collection you want to union together. You may use the pipeline field to do some specific filtering, or leave localField and foreignField as null to match the whole collection.

    { $lookup: { from: 'collectionToUnion1', pipeline: [...], as: 'Collection1' } },
    { $lookup: { from: 'collectionToUnion2', pipeline: [...], as: 'Collection2' } },
    { $lookup: { from: 'collectionToUnion3', pipeline: [...], as: 'Collection3' } }
    
  5. You now have an aggregate containing a single document that contains 3 arrays like this:

    {
    Collection1: [...],
    Collection2: [...],
    Collection3: [...]
    }
    

    然后,你可以使用$project阶段和$concatArrays聚合操作符将它们合并成一个数组:

    {
    "$project" :
    {
    "Union" : { $concatArrays: ["$Collection1", "$Collection2", "$Collection3"] }
    }
    }
    
  6. You now have an aggregate containing a single document, into which is located an array that contains your union of collections. What remains to be done is to add an $unwind and a $replaceRoot stage to split your array into separate documents:

    { $unwind: "$Union" },
    { $replaceRoot: { newRoot: "$Union" } }
    
  7. Voilà. You now have a result set containing the collections you wanted to union together. You can then add more stages to filter it further, sort it, apply skip() and limit(). Pretty much anything you want.

Mongo 4.4开始,我们可以通过将新的$unionWith聚合阶段与$group的新$accumulator操作符耦合起来,在聚合管道中实现这种连接:

// > db.users.find()
//   [{ user: 1, name: "x" }, { user: 2, name: "y" }]
// > db.books.find()
//   [{ user: 1, book: "a" }, { user: 1, book: "b" }, { user: 2, book: "c" }]
// > db.movies.find()
//   [{ user: 1, movie: "g" }, { user: 2, movie: "h" }, { user: 2, movie: "i" }]
db.users.aggregate([
{ $unionWith: "books"  },
{ $unionWith: "movies" },
{ $group: {
_id: "$user",
user: {
$accumulator: {
accumulateArgs: ["$name", "$book", "$movie"],
init: function() { return { books: [], movies: [] } },
accumulate: function(user, name, book, movie) {
if (name) user.name = name;
if (book) user.books.push(book);
if (movie) user.movies.push(movie);
return user;
},
merge: function(userV1, userV2) {
if (userV2.name) userV1.name = userV2.name;
userV1.books.concat(userV2.books);
userV1.movies.concat(userV2.movies);
return userV1;
},
lang: "js"
}
}
}}
])
// { _id: 1, user: { books: ["a", "b"], movies: ["g"], name: "x" } }
// { _id: 2, user: { books: ["c"], movies: ["h", "i"], name: "y" } }
  • $unionWith将给定集合中的记录合并到聚合管道中已经存在的文档中。在两个合并阶段之后,我们就拥有了管道中的所有用户、书籍和电影记录。

  • 然后,我们通过$user $group记录,并使用$accumulator操作符对项进行累加,允许在文档分组时自定义累加:

    • 我们感兴趣的字段是用accumulateArgs定义的。
    • init定义了元素分组时将累积的状态。
    • accumulate函数允许对被分组的记录执行自定义操作,以构建累积状态。例如,如果被分组的项定义了book字段,则更新状态的books部分。
    • merge用于合并两个内部状态。它只用于在分片集群上运行的聚合,或者当操作超过内存限制时。
    • 李< / ul > < / >