串行地对 mongodb 游标进行迭代(在移动到下一个文档之前等待回调)

使用 mongoskin,我可以执行下面这样的查询,它将返回一个光标:

myCollection.find({}, function(err, resultCursor) {
resultCursor.each(function(err, result) {


}
}

但是,我希望为每个文档调用一些异步函数,并且只在回调之后移到游标上的下一个项目(类似于 sync.js 模块中的 eachSeries 结构)。例如:

myCollection.find({}, function(err, resultCursor) {
resultCursor.each(function(err, result) {


externalAsyncFunction(result, function(err) {
//externalAsyncFunction completed - now want to move to next doc
});


}
}

我怎么能这么做?

谢谢

更新:

我不想使用 toArray(),因为这是一个大的批处理操作,结果可能不适合在内存中一次完成。

46882 次浏览

您可以在 Array中获得结果,并使用递归函数进行迭代,如下所示。

myCollection.find({}).toArray(function (err, items) {
var count = items.length;
var fn = function () {
externalAsyncFuntion(items[count], function () {
count -= 1;
if (count) fn();
})
}


fn();
});

编辑:

这只适用于较小的数据集,对于较大的数据集,您应该使用其他答案中提到的游标。

如果不想使用 toArray 将所有结果加载到内存中,可以使用光标进行迭代,如下所示。

myCollection.find({}, function(err, resultCursor) {
function processItem(err, item) {
if(item === null) {
return; // All done!
}


externalAsyncFunction(item, function(err) {
resultCursor.nextObject(processItem);
});


}


resultCursor.nextObject(processItem);
}

您可以使用异步库执行类似的操作。这里的关键点是检查当前文档是否为空。如果是的话,就意味着你完蛋了。

async.series([
function (cb) {
cursor.each(function (err, doc) {
if (err) {
cb(err);
} else if (doc === null) {
cb();
} else {
console.log(doc);
array.push(doc);
}
});
}
], function (err) {
callback(err, array);
});

您可以使用简单的 setTimeOut。这是一个在 nodejs 上运行的类型脚本的例子(我通过‘ when’模块使用了一些承诺,但是也可以不使用这些承诺) :

        import mongodb = require("mongodb");


var dbServer = new mongodb.Server('localhost', 27017, {auto_reconnect: true}, {});
var db =  new mongodb.Db('myDb', dbServer);


var util = require('util');
var when = require('when'); //npm install when


var dbDefer = when.defer();
db.open(function() {
console.log('db opened...');
dbDefer.resolve(db);
});


dbDefer.promise.then(function(db : mongodb.Db){
db.collection('myCollection', function (error, dataCol){
if(error) {
console.error(error); return;
}


var doneReading = when.defer();


var processOneRecordAsync = function(record) : When.Promise{
var result = when.defer();


setTimeout (function() {
//simulate a variable-length operation
console.log(util.inspect(record));
result.resolve('record processed');
}, Math.random()*5);


return result.promise;
}


var runCursor = function (cursor : MongoCursor){
cursor.next(function(error : any, record : any){
if (error){
console.log('an error occurred: ' + error);
return;
}
if (record){
processOneRecordAsync(record).then(function(r){
setTimeout(function() {runCursor(cursor)}, 1);
});
}
else{
//cursor up
doneReading.resolve('done reading data.');
}
});
}


dataCol.find({}, function(error, cursor : MongoCursor){
if (!error)
{
setTimeout(function() {runCursor(cursor)}, 1);
}
});


doneReading.promise.then(function(message : string){
//message='done reading data'
console.log(message);
});
});
});

你可以用一个未来:

myCollection.find({}, function(err, resultCursor) {
resultCursor.count(Meteor.bindEnvironment(function(err,count){
for(var i=0;i<count;i++)
{
var itemFuture=new Future();


resultCursor.nextObject(function(err,item)){
itemFuture.result(item);
}


var item=itemFuture.wait();
//do what you want with the item,
//and continue with the loop if so


}
}));
});

如果有人正在寻找一种做到这一点的专用方法(而不是使用 nextObject 的回调) ,那么下面就是这种方法。我使用的是 Node v4.2.2和 mongo 驱动程序 v2.1.7。这是 Cursor.forEach()的一种异步系列版本:

function forEachSeries(cursor, iterator) {
return new Promise(function(resolve, reject) {
var count = 0;
function processDoc(doc) {
if (doc != null) {
count++;
return iterator(doc).then(function() {
return cursor.next().then(processDoc);
});
} else {
resolve(count);
}
}
cursor.next().then(processDoc);
});
}

要使用它,需要传递游标和一个异步操作每个文档的迭代器(就像对 Cursor.forEach 所做的那样)。迭代器需要返回一个承诺,就像大多数 mongodb 本机驱动程序函数所做的那样。

比方说,你想更新集合 test中的所有文档,你可以这样做:

var theDb;
MongoClient.connect(dbUrl).then(function(db) {
theDb = db;     // save it, we'll need to close the connection when done.
var cur = db.collection('test').find();


return forEachSeries(cur, function(doc) {    // this is the iterator
return db.collection('test').updateOne(
{_id: doc._id},
{$set: {updated: true}}       // or whatever else you need to change
);
// updateOne returns a promise, if not supplied a callback. Just return it.
});
})
.then(function(count) {
console.log("All Done. Processed", count, "records");
theDb.close();
})

这可以通过使用 setDirect 来处理大型数据集:

var cursor = collection.find({filter...}).cursor();


cursor.nextObject(function fn(err, item) {
if (err || !item) return;


setImmediate(fnAction, item, arg1, arg2, function() {
cursor.nextObject(fn);
});
});


function fnAction(item, arg1, arg2, callback) {
// Here you can do whatever you want to do with your item.
return callback();
}

使用 async/await的更现代的方法:

const cursor = db.collection("foo").find({});
while(await cursor.hasNext()) {
const doc = await cursor.next();
// process doc here
}

备注:

  • 这可能是 甚至更多的简单做时,异步迭代器到达。
  • 您可能需要添加 try/catch 来进行错误检查。
  • 包含函数应该是 async,或者代码应该包装在 (async function() { ... })()中,因为它使用 await
  • 如果需要,可以在 while 循环的末尾添加 await new Promise(resolve => setTimeout(resolve, 1000));(暂停1秒钟) ,以显示它一个接一个地执行进程文档。

因为 node.js v10.3 可以使用异步迭代器

const cursor = db.collection('foo').find({});
for await (const doc of cursor) {
// do your thing
// you can even use `await myAsyncOperation()` here
}

Jake Archibald 写了关于异步迭代器的 一篇很棒的博客文章,我是在读了@user993683的回答后才知道的。