在使用 ES6的 Promise.all ()时,限制并发性的最佳方法是什么?

我有一些代码,它们迭代从数据库中查询出来的列表,并对该列表中的每个元素发出 HTTP 请求。这个列表有时可能是一个相当大的数字(以千为单位) ,我希望确保我没有遇到有数千个并发 HTTP 请求的 Web 服务器。

这段代码的缩写版本现在看起来像这样..。

function getCounts() {
return users.map(user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
});
}


Promise.all(getCounts()).then(() => { /* snip */});

此代码在 Node 4.3.2上运行。重申一下,Promise.all是否可以管理到在任何给定时间只有一定数量的承诺正在进行?

124303 次浏览

请注意,Promise.all()不会触发开始工作的承诺,而是创建承诺本身。

考虑到这一点,一个解决方案是检查每当一个承诺被解决时,是否应该开始一个新的承诺,或者你是否已经到了极限。

然而,实际上没有必要在这里重新发明轮子:

var PromisePool = require('es6-promise-pool')
 

var promiseProducer = function () {
// Your code goes here.
// If there is work left to be done, return the next work item as a promise.
// Otherwise, return null to indicate that all promises have been created.
// Scroll down for an example.
}
 

// The number of promises to process simultaneously.
var concurrency = 3
 

// Create a pool.
var pool = new PromisePool(promiseProducer, concurrency)
 

// Start the pool.
var poolPromise = pool.start()
 

// Wait for the pool to settle.
poolPromise.then(function () {
console.log('All promises fulfilled')
}, function (error) {
console.log('Some promise rejected: ' + error.message)
})

不要使用承诺来限制 http 请求,而是使用节点内置的 http.Agent.maxSockets。这消除了使用库或编写自己的池代码的要求,而且还有一个额外的好处,就是可以更好地控制限制的内容。

探员 maxSockets

默认设置为 Infinity。确定代理每个原点可以打开多少个并发套接字。Origin 是“ host: port”或“ host: port: localAddress”组合。

例如:

var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);

如果向同一个来源发出多个请求,将 keepAlive设置为 true 也许对您有好处(更多信息请参见上面的文档)。

蓝鸟的 保证,地图可以采用并发选项来控制应该并行运行多少承诺。有时它比 .all更容易,因为您不需要创建承诺数组。

const Promise = require('bluebird')


function getCounts() {
return Promise.map(users, user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
}, {concurrency: 10}); // <---- at most 10 http requests at a time
}

如果您知道迭代器如何工作以及如何使用它们,那么您就不需要任何额外的库,因为自己构建自己的并发性会变得非常容易。让我演示一下:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()




// loop over all items with for..of
for (const x of iterator) {
console.log('x:', x)
  

// notices how this loop continues the same iterator
// and consumes the rest of the iterator, making the
// outer loop not logging any more x's
for (const y of iterator) {
console.log('y:', y)
}
}

我们可以使用相同的迭代器并在工人之间共享它。

如果您使用的是 .entries()而不是 .values(),那么您将得到一个迭代器,它将生成 [index, value],我将在下面演示它的并发性为2

const sleep = t => new Promise(rs => setTimeout(rs, t))
const iterator = Array.from('abcdefghij').entries()
// const results = [] || Array(someLength)


async function doWork (iterator) {
for (let [index, item] of iterator) {
await sleep(1000)
console.log(index + ': ' + item)


// in case you need to store the results in order
// results[index] = item + item


// or if the order dose not mather
// results.push(item + item)
}
}


const workers = Array(2).fill(iterator).map(doWork)
//    ^--- starts two workers sharing the same iterator


Promise.allSettled(workers).then(console.log.bind(null, 'done'))

这样做的好处是,你可以有一个 发电机功能发电机功能,而不是有一切都准备好了。

更棒的是,您可以在 node 中执行 stream.Readable.from(iterator)(最终也可以在 whatwg 流中执行 stream.Readable.from(iterator))。并且有了可转移的 ReadbleStream,这使得这种潜力在特性中非常有用,如果您与 Web worker 一起工作也是为了性能的话


注意: 与示例 异步池不同的是,它产生两个 worker,所以如果一个 worker 因为某种原因在索引5抛出错误,它不会阻止另一个 worker 执行其余的操作。所以你从做2并发减少到1。(所以它不会停在那里)所以我的建议是,你捕获内部的 doWork函数的所有错误

因此,我尝试为我的代码制作一些示例,但是因为这只是一个导入脚本,而不是生产代码,所以使用 npm 包 批量承诺对我来说肯定是最简单的方法

注意: 需要运行时支持承诺或多填充。

阿比 Batch誓言(int: batchSize,array: Collection,i = >  允诺: 迭代) 承诺: 每次批次后都会调用受试者。

用途:

batch-promises
Easily batch promises


NOTE: Requires runtime to support Promise or to be polyfilled.


Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.


Use:
import batchPromises from 'batch-promises';
 

batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
 

// The iteratee will fire after each batch resulting in the following behaviour:
// @ 100ms resolve items 1 and 2 (first batch of 2)
// @ 200ms resolve items 3 and 4 (second batch of 2)
// @ 300ms resolve remaining item 5 (last remaining batch)
setTimeout(() => {
resolve(i);
}, 100);
}))
.then(results => {
console.log(results); // [1,2,3,4,5]
});

这是我使用 Promise.race所做的,在我的代码中

const identifyTransactions = async function() {
let promises = []
let concurrency = 0
for (let tx of this.transactions) {
if (concurrency > 4)
await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
promises.push(tx.identifyTransaction())
concurrency++
}
if (promises.length > 0)
await Promise.race(promises) //resolve the rest
}

如果你想看一个例子: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/

P-极限

我已经将承诺并发限制与自定义脚本、蓝鸟、 es6承诺池和 p 限制进行了比较。我相信 极限具有最简单、最精简的实现来满足这种需求。看看他们的文件.

规定

例如,与异步兼容

我的例子

在这个例子中,我们需要为数组中的每个 URL 运行一个函数(比如,可能是一个 API 请求)。这个叫做 fetchData()。如果要处理数千个项目的数组,并发肯定有助于节省 CPU 和内存资源。

const pLimit = require('p-limit');


// Example Concurrency of 3 promise at once
const limit = pLimit(3);


let urls = [
"http://www.exampleone.com/",
"http://www.exampletwo.com/",
"http://www.examplethree.com/",
"http://www.examplefour.com/",
]


// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {


// wrap the function we are calling in the limit function we defined above
return limit(() => fetchData(url));
});


(async () => {
// Only three promises are run at once (as defined above)
const result = await Promise.all(promises);
console.log(result);
})();

控制台日志结果是已解析的承诺响应数据的数组。

下面是流媒体和“ p-limit”的基本例子,它将 http read stream 流媒体传送到 mongo db。

const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;




const pipeline = util.promisify(stream.pipeline)


const outputDBConfig = {
dbURL: 'yr-db-url',
collection: 'some-collection'
};
const limit = pLimit(3);


async yrAsyncStreamingFunction(readStream) => {
const mongoWriteStream = streamToMongoDB(outputDBConfig);
const mapperStream = es.map((data, done) => {
let someDataPromise = limit(() => yr_async_call_to_somewhere())


someDataPromise.then(
function handleResolve(someData) {


data.someData = someData;
done(null, data);
},
function handleError(error) {
done(error)
}
);
})


await pipeline(
readStream,
JSONStream.parse('*'),
mapperStream,
mongoWriteStream
);
}

如果您不想使用外部库,那么递归就是答案

downloadAll(someArrayWithData){
var self = this;


var tracker = function(next){
return self.someExpensiveRequest(someArrayWithData[next])
.then(function(){
next++;//This updates the next in the tracker function parameter
if(next < someArrayWithData.length){//Did I finish processing all my data?
return tracker(next);//Go to the next promise
}
});
}


return tracker(0);
}

可以使用递归解析它。

其思想是,最初您发送最大允许数量的请求,并且每个请求在完成时应该递归地继续发送自己。

function batchFetch(urls, concurrentRequestsLimit) {
return new Promise(resolve => {
var documents = [];
var index = 0;


function recursiveFetch() {
if (index === urls.length) {
return;
}
fetch(urls[index++]).then(r => {
documents.push(r.text());
if (documents.length === urls.length) {
resolve(documents);
} else {
recursiveFetch();
}
});
}


for (var i = 0; i < concurrentRequestsLimit; i++) {
recursiveFetch();
}
});
}


var sources = [
'http://www.example_1.com/',
'http://www.example_2.com/',
'http://www.example_3.com/',
...
'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => {
console.log(documents);
});

我建议使用库异步池: https://github.com/rxaviers/async-pool

npm install tiny-async-pool

描述:

使用本机 ES6/ES7以有限的并发性运行多个承诺返回和异步函数

SyncPool 在一个有限的并发池中运行多个承诺返回和异步函数。一旦其中一个承诺被拒绝,它立即拒绝。当所有的承诺都实现了,它就会解决。它尽快调用迭代器函数(在并发限制下)。

用法:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.

使用 Array.prototype.splice

while (funcs.length) {
// 100 at a time
await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
  • @ tcooc 的回答相当酷,我不知道,将来会利用它。
  • 我也喜欢 @ MatthewRideout的答案,但它使用了一个外部库! !

只要有可能,我都会尝试自己开发这类东西,而不是去图书馆。你最终学到了很多以前看起来令人畏惧的概念。

 class Pool{
constructor(maxAsync) {
this.maxAsync = maxAsync;
this.asyncOperationsQueue = [];
this.currentAsyncOperations = 0
}


runAnother() {
if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) {
this.currentAsyncOperations += 1;
this.asyncOperationsQueue.pop()()
.then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() })
}
}


add(f){  // the argument f is a function of signature () => Promise
this.runAnother();
return new Promise((resolve, reject) => {
this.asyncOperationsQueue.push(
() => f().then(resolve).catch(reject)
)
})
}
}


//#######################################################
//                        TESTS
//#######################################################


function dbCall(id, timeout, fail) {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (fail) {
reject(`Error for id ${id}`);
} else {
resolve(id);
}
}, timeout)
}
)
}




const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);




const cappedPool = new Pool(2);


const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))

这种方法提供了一个很好的 API,类似于 scala/java 中的线程池。
在使用 const cappedPool = new Pool(2)创建池的一个实例之后,您可以使用简单的 cappedPool.add(() => myPromise)向它提供承诺。
显然,我们必须确保承诺不会立即启动,这就是为什么我们必须在函数的帮助下“懒惰地提供它”的原因。

最重要的是,请注意方法 add 是一个承诺,将完成/解决的价值,你原来的承诺的结果。

const resultPromise = cappedPool.add( () => dbCall(...))
resultPromise
.then( actualResult => {
// Do something with the result form the DB
}
)

不幸的是,没有办法做到这一点与本地承诺。所以你必须有创造性。

这是我能找到的不使用任何外部库的最快最简洁的方法。

它利用了一个叫做迭代器的新的 javascript 特性。迭代器基本上跟踪哪些项被处理了,哪些没有。

为了在代码中使用它,需要创建一个异步函数数组。每个异步函数为需要处理的下一个项目请求相同的迭代器。每个函数都异步处理自己的项目,完成后要求迭代器添加一个新的项目。一旦迭代器用完了项,所有函数就完成了。

感谢“无尽”的鼓舞。

const items = [
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2'
]


const concurrency = 5


Array(concurrency).fill(items.entries()).map(async (cursor) => {
for (let [index, url] of cursor){
console.log('getting url is ', index, url)
// run your async task instead of this next line
var text = await fetch(url).then(res => res.text())
console.log('text is', text.slice(0, 20))
}
})

下面是我的 ES7解决方案,它是一个复制粘贴友好的、功能完整的 Promise.all()/map()替代方案,带有并发限制。

Promise.all()类似,它维护返回顺序以及非承诺返回值的备份。

我还对不同的实现进行了比较,因为它说明了其他一些解决方案遗漏的一些方面。

用法

const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4

实施

async function asyncBatch(args, fn, limit = 8) {
// Copy arguments to avoid side effects
args = [...args];
const outs = [];
while (args.length) {
const batch = args.splice(0, limit);
const out = await Promise.all(batch.map(fn));
outs.push(...out);
}
return outs;
}


async function asyncPool(args, fn, limit = 8) {
return new Promise((resolve) => {
// Copy arguments to avoid side effect, reverse queue as
// pop is faster than shift
const argQueue = [...args].reverse();
let count = 0;
const outs = [];
const pollNext = () => {
if (argQueue.length === 0 && count === 0) {
resolve(outs);
} else {
while (count < limit && argQueue.length) {
const index = args.length - argQueue.length;
const arg = argQueue.pop();
count += 1;
const out = fn(arg);
const processOut = (out, index) => {
outs[index] = out;
count -= 1;
pollNext();
};
if (typeof out === 'object' && out.then) {
out.then(out => processOut(out, index));
} else {
processOut(out, index);
}
}
}
};
pollNext();
});
}

比较

// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() => {
console.log(delay);
resolve(delay);
}, delay));


// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];


// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.


// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms


// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms


// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms


console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3


// Conclusion: Execution order and performance is different,
// but return order is still identical

结论

asyncPool()应该是最好的解决方案,因为它允许新的请求在前一个请求完成后立即开始。

asyncBatch()作为一个比较包含在内,因为它的实现更容易理解,但是它的性能应该更慢,因为为了开始下一个批处理,同一批处理中的所有请求都需要完成。

在这个人为的例子中,非限制性的 Promise.all()当然是最快的,而其他的在真实世界的拥塞场景中可以执行得更好。

更新

其他人已经建议的异步池库可能是我的实现的一个更好的替代方案,因为它的工作方式几乎完全相同,并且有一个更简洁的实现,并且巧妙地使用了 Nooe.race () : https://github.com/rxaviers/async-pool/blob/master/lib/es7.js

希望我的回答仍然有教育价值。

这么多好的解决方案。我开始使用@Enless 发布的优雅解决方案,最后使用了这个小小的扩展方法,它不使用任何外部库,也不分批运行(尽管假设您具有异步等特性) :

Promise.allWithLimit = async (taskList, limit = 5) => {
const iterator = taskList.entries();
let results = new Array(taskList.length);
let workerThreads = new Array(limit).fill(0).map(() =>
new Promise(async (resolve, reject) => {
try {
let entry = iterator.next();
while (!entry.done) {
let [index, promise] = entry.value;
try {
results[index] = await promise;
entry = iterator.next();
}
catch (err) {
results[index] = err;
}
}
// No more work to do
resolve(true);
}
catch (err) {
// This worker is dead
reject(err);
}
}));


await Promise.all(workerThreads);
return results;
};

    Promise.allWithLimit = async (taskList, limit = 5) => {
const iterator = taskList.entries();
let results = new Array(taskList.length);
let workerThreads = new Array(limit).fill(0).map(() =>
new Promise(async (resolve, reject) => {
try {
let entry = iterator.next();
while (!entry.done) {
let [index, promise] = entry.value;
try {
results[index] = await promise;
entry = iterator.next();
}
catch (err) {
results[index] = err;
}
}
// No more work to do
resolve(true);
}
catch (err) {
// This worker is dead
reject(err);
}
}));
    

await Promise.all(workerThreads);
return results;
};


const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => {
let n = (i + 1) * 5;
setTimeout(() => {
console.log(`Did nothing for ${n} seconds`);
resolve(n);
}, n * 1000);
}));


var results = Promise.allWithLimit(demoTasks);

在@decelatedcaviar 发布的答案的基础上,我创建了一个以参数为参数的“批处理”实用函数: 值数组、并发限制和处理函数。是的,我意识到这种方式更像是批处理和真正的并发,但是如果目标是限制一次过多的 HTTP 调用,我会采用这种方式,因为它简单,不需要外部库。

async function batch(o) {
let arr = o.arr
let resp = []
while (arr.length) {
let subset = arr.splice(0, o.limit)
let results = await Promise.all(subset.map(o.process))
resp.push(results)
}
return [].concat.apply([], resp)
}


let arr = []
for (let i = 0; i < 250; i++) { arr.push(i) }


async function calc(val) { return val * 100 }


(async () => {
let resp = await batch({
arr: arr,
limit: 100,
process: calc
})
console.log(resp)
})();

还有一个带有自定义承诺库(答应我)的解决方案:

    import { CPromise } from "c-promise2";
import cpFetch from "cp-fetch";
    

const promise = CPromise.all(
function* () {
const urls = [
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
];
    

for (const url of urls) {
yield cpFetch(url); // add a promise to the pool
console.log(`Request [${url}] completed`);
}
},
{ concurrency: 2 }
).then(
(v) => console.log(`Done: `, v),
(e) => console.warn(`Failed: ${e}`)
);
    

// yeah, we able to cancel the task and abort pending network requests
// setTimeout(() => promise.cancel(), 4500);

import { CPromise } from "c-promise2";
import cpFetch from "cp-fetch";
    

const promise = CPromise.all(
[
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
],
{
mapper: (url) => {
console.log(`Request [${url}]`);
return cpFetch(url);
},
concurrency: 2
}
).then(
(v) => console.log(`Done: `, v),
(e) => console.warn(`Failed: ${e}`)
);
    

// yeah, we able to cancel the task and abort pending network requests
//setTimeout(() => promise.cancel(), 4500);


警告,这还没有进行效率基准测试,并且进行了大量的数组复制/创建

如果你想要一个更实用的方法,你可以这样做:

import chunk from 'lodash.chunk';


const maxConcurrency = (max) => (dataArr, promiseFn) =>
chunk(dataArr, max).reduce(
async (agg, batch) => [
...(await agg),
...(await Promise.all(batch.map(promiseFn)))
],
[]
);

然后你可以像这样使用它:

const randomFn = (data) =>
new Promise((res) => setTimeout(
() => res(data + 1),
Math.random() * 1000
));




const result = await maxConcurrency(5)(
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
randomFn
);
console.log('result+++', result);

Semaphore 是为解决类似问题而设计的众所周知的并发原语。它是非常通用的结构,信号量的实现存在于许多语言中。这就是如何使用 Semaphore 来解决这个问题:

async function main() {
const s = new Semaphore(100);
const res = await Promise.all(
entities.map((users) =>
s.runExclusive(() => remoteServer.getCount(user))
)
);
return res;
}

我使用的是来自 异步互斥的 Semaphore 实现,它有不错的文档和 TypeScript 支持。

如果你想深入挖掘这样的主题,你可以看看书“信号量的小书”,这是免费提供的 PDF (只有英文版)

如果你的目标是放慢你的承诺,所有这些都是为了避免利率限制或者过载:

这是我的实施方案

async function promiseAllGentle(arr, batchSize = 5, sleep = 50) {
let output = [];
while (arr.length) {
const batchResult = await Promise.all(arr.splice(0, batchSize));
output = [...output, ...batchResult];
await new Promise((res) => setTimeout(res, sleep));
}
return output;
}

正如本答案线程中的所有其他人指出的那样,如果需要限制并发性,Promise.all()不会做正确的事情。但理想情况下,你甚至不应该等到 想要的承诺 所有完成后才处理它们。

相反,您希望在每个结果可用时尽快处理它们,这样就不必等到最后一个承诺完成后才开始对它们进行迭代。

因此,这里有一个代码示例,部分基于 无尽的答案,也基于 这是 T.J. Crowder 的回答

// example tasks that sleep and return a number
// in real life, you'd probably fetch URLs or something
const tasks = [];
for (let i = 0; i < 20; i++) {
tasks.push(async () => {
console.log(`start ${i}`);
await sleep(Math.random() * 1000);
console.log(`end ${i}`);
return i;
});
}
function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }


(async () => {
for await (let value of runTasks(3, tasks.values())) {
console.log(`output ${value}`);
}
})();


async function* runTasks(maxConcurrency, taskIterator) {
// Each async iterator is a worker, polling for tasks from the shared taskIterator
// Sharing the iterator ensures that each worker gets unique tasks.
const asyncIterators = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++) {
asyncIterators[i] = (async function* () {
for (const task of taskIterator) yield await task();
})();
}
yield* raceAsyncIterators(asyncIterators);
}


async function* raceAsyncIterators(asyncIterators) {
async function nextResultWithItsIterator(iterator) {
return { result: await iterator.next(), iterator: iterator };
}
/** @type Map<AsyncIterator<T>,
Promise<{result: IteratorResult<T>, iterator: AsyncIterator<T>}>> */
const promises = new Map(asyncIterators.map(iterator =>
[iterator, nextResultWithItsIterator(iterator)]));
while (promises.size) {
const { result, iterator } = await Promise.race(promises.values());
if (result.done) {
promises.delete(iterator);
} else {
promises.set(iterator, nextResultWithItsIterator(iterator));
yield result.value;
}
}
}

这里有很多魔法,听我解释。

这个解决方案是围绕 异步生成器函数异步生成器函数构建的,许多 JS 开发人员可能不熟悉 异步生成器函数异步生成器函数

yield1(又名 function*函数)返回一个“生成器”,即结果的迭代器。允许生成器函数在通常使用 return关键字的地方使用 yield关键字。当调用者第一次在生成器上调用 next()(或者使用 for...of循环)时,function*函数会一直运行,直到 yield返回一个值; 这个值就成为迭代器的 next()值。但是在随后调用 next()时,生成器函数从 yield语句恢复,就在它停止的地方,即使它处于循环的中间。(也可以使用 yield2生成另一个生成器函数的所有结果。)

“异步生成器函数”(async function*)是一个生成器函数,它返回一个“异步迭代器”,这是一个承诺的迭代器。可以在异步迭代器上调用 for await...of。异步生成器函数可以使用 await关键字,就像在任何 async function中一样。

在本例中,我们使用任务函数数组调用 runTasks()runTasks()是一个异步生成器函数,所以我们可以用 for await...of循环调用它。每次循环运行时,我们将处理最近完成的任务的结果。

runTasks()创建 N 个异步迭代器,即 worker。(注意,worker 最初定义为异步生成器函数,但是我们立即调用每个函数,并将每个生成的异步迭代器存储在 asyncIterators数组中。)该示例使用3个并发工作者调用 runTasks,因此同时启动的任务不超过3个。任何任务完成后,我们立即将下一个任务排队。(这比“批处理”要好,在“批处理”中,您同时执行3个任务,等待所有3个任务,并且在前一批任务完成之前不要开始下一批3个任务。)

runTasks()通过与 yield* raceAsyncIterators()“竞赛”其异步迭代器来结束。raceAsyncIterators()类似于 Promise.race(),但它与 N 个承诺迭代器竞争,而不仅仅是 N 个承诺; 它返回一个异步迭代器,产生已解析的承诺的结果。

raceAsyncIterators()首先从每个到承诺的迭代器定义一个 promises Map。每个承诺都是对迭代结果的承诺,以及生成迭代结果的迭代器。

使用 promises映射,我们可以 Promise.race()映射的值,给我们获胜的迭代结果及其迭代器。如果迭代器完全是 done,我们将其从映射中删除; 否则我们将其在 promises映射中的  诺替换为迭代器的 next()承诺和 yield result.value

总之,runTasks()是一个异步生成器函数,它产生 N 个并发异步任务迭代器的结果,因此最终用户只需 for await (let value of runTasks(3, tasks.values()))就可以处理每个结果。

我一直在使用 瓶颈库,实际上我非常喜欢它,但是在我的例子中,它没有释放内存,并且一直在长时间运行作业... ... 这对于运行大量的作业来说并不是很好,因为你可能首先需要一个节流/并发库。

我需要一个简单、低开销、易于维护的解决方案。我还想要一些东西,以保持池充足,而不是简单的批处理预定义的块... 在一个下载的情况下,这将停止 NGB 文件从阻塞你的队列分钟/小时的时间,即使批处理的其余部分早已完成。

这是我一直在使用的 Node.js v16 + 、无依赖、异步生成器解决方案:

const promiseState = function( promise ) {
// A promise could never resolve to a unique symbol unless it was in this scope
const control = Symbol();


// This helps us determine the state of the promise... A little heavy, but it beats a third-party promise library. The control is the second element passed to Promise.race() since it will only resolve first if the promise being tested is pending.
return Promise
.race([ promise, control ])
.then( value => ( value === control ) ? 'pending' : 'fulfilled' )
.catch( () => 'rejected' );
}


const throttle = async function* ( reservoir, promiseFunction, highWaterMark ) {
let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseFunction( item ) );


while ( iterable.length > 0 ) {
// When a promise has resolved we have space to top it up to the high water mark...
await Promise.any( iterable );


const pending = [];
const resolved = [];


// This identifies the promise(s) that have resolved so that we can yield them
for ( const currentValue of iterable ) {
if ( await promiseState( currentValue ) === 'pending' ) {
pending.push( currentValue );
} else {
resolved.push( currentValue );
}
}


// Put the remaining promises back into iterable, and top it to the high water mark
iterable = [
...pending,
...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseFunction( value ) )
];


yield Promise.allSettled( resolved );
}
}


// This is just an example of what would get passed as "promiseFunction"... This can be the function that returns your HTTP request promises
const getTimeout = delay => new Promise( (resolve, reject) => setTimeout(resolve, delay, delay) );


// This is just the async IIFE that bootstraps this example
( async () => {


const test = [ 1000, 2000, 3000, 4000, 5000, 6000, 1500, 2500, 3500, 4500, 5500, 6500 ];


for await ( const timeout of throttle( test, getTimeout, 4 ) ) {
console.log( timeout );
}


} )();

我有解决方案,创建块和使用。Reduce 函数等待每个块的承诺。如果承诺有一些通话限制,我还会增加一些延迟。

export function delay(ms: number) {
return new Promise<void>((resolve) => setTimeout(resolve, ms));
}


export const chunk = <T>(arr: T[], size: number): T[][] => [
...Array(Math.ceil(arr.length / size)),
].map((_, i) => arr.slice(size * i, size + size * i));


const myIdlist = []; // all items
const groupedIdList = chunk(myIdList, 20); // grouped by 20 items


await groupedIdList.reduce(async (prev, subIdList) => {
await prev;
// Make sure we wait for 500 ms after processing every page to prevent overloading the calls.
const data = await Promise.all(subIdList.map(myPromise));
await delay(500);
}, Promise.resolve());

这个解决方案使用一个 异步发电机异步发电机通过普通的 javascript 来管理并发承诺:

  • 作为承诺生成函数的参数提供的值数组(例如,URL 数组)
  • 返回承诺的函数(例如,返回 HTTP 请求的承诺)
  • 表示允许的最大并发承诺的整数。

只有在需要时才实例化承诺,以减少内存消耗。可以使用 等待... 的语句迭代结果。

下面的示例提供了一个检查承诺状态的函数、油门异步生成器和一个基于 SetTimeout返回承诺的简单函数。最后的 异步生活定义超时值库,设置由 throttle返回的 异步迭代,然后在解析结果时迭代结果。

如果您想要一个更完整的 HTTP 请求示例,请在注释中告诉我。

请注意,需要 Node.js 16 + 按异步生成器的顺序排列。

const promiseState = function( promise ) {
const control = Symbol();


return Promise
.race([ promise, control ])
.then( value => ( value === control ) ? 'pending' : 'fulfilled' )
.catch( () => 'rejected' );
}


const throttle = async function* ( reservoir, promiseClass, highWaterMark ) {
let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseClass( item ) );


while ( iterable.length > 0 ) {
await Promise.any( iterable );


const pending = [];
const resolved = [];


for ( const currentValue of iterable ) {
if ( await promiseState( currentValue ) === 'pending' ) {
pending.push( currentValue );
} else {
resolved.push( currentValue );
}
}


console.log({ pending, resolved, reservoir });


iterable = [
...pending,
...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseClass( value ) )
];


yield Promise.allSettled( resolved );
}
}


const getTimeout = delay => new Promise( ( resolve, reject ) => {
setTimeout(resolve, delay, delay);
} );


( async () => {
const test = [ 1100, 1200, 1300, 10000, 11000, 9000, 5000, 6000, 3000, 4000, 1000, 2000, 3500 ];


const throttledRequests = throttle( test, getTimeout, 4 );


for await ( const timeout of throttledRequests ) {
console.log( timeout );
}
} )();

控制承诺/请求的最大数量的一个很好的解决方案是将请求列表分割成多个页面,并且一次只生成一个页面的请求。

下面的例子利用了 ITER-OPS库:

import {pipeAsync, map, page} from 'iter-ops';


const i = pipeAsync(
users, // make it asynchronous
page(10), // split into pages of 10 items in each
map(p => Promise.all(p.map(u => u.remoteServer.getCount(u)))), // map into requests
wait() // resolve each page in the pipeline
);


// below triggers processing page-by-page:


for await(const p of i) {
//=> p = resolved page of data
}

这样,它就不会尝试创建比一个页面大小更多的请求/承诺。

使用 tiny-async-pool ES9 等待... 的 API,您可以执行以下操作:

const asyncPool = require("tiny-async-pool");
const getCount = async (user) => ([user, remoteServer.getCount(user)]);
const concurrency = 2;


for await (const [user, count] of asyncPool(concurrency, users, getCount)) {
console.log(user, count);
}

上面的 syncPool 函数返回一个异步迭代器,一旦承诺完成(在并发限制下) ,该异步迭代器就会生成,一旦其中一个承诺被拒绝,它就会立即拒绝。

使用 https://www.npmjs.com/package/job-pipe可以限制对服务器的请求

基本上,您创建一个管道,并告诉它需要多少并发请求:

const pipe = createPipe({ throughput: 6, maxQueueSize: Infinity })

然后你取出执行调用的函数并强制它通过管道同时创建有限数量的调用:

const makeCall = async () => {...}
const limitedMakeCall = pipe(makeCall)

最后,您可以根据需要调用该方法多次,就好像它没有改变一样,并且它会限制自己可以处理多少次并行执行:

await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
....
await limitedMakeCall()

利润。

我建议不要下载软件包,也不要编写数百行代码:

async function async_arr<T1, T2>(
arr: T1[],
func: (x: T1) => Promise<T2> | T2, //can be sync or async
limit = 5
) {
let results: T2[] = [];
let workers = [];
let current = Math.min(arr.length, limit);
async function process(i) {
if (i < arr.length) {
results[i] = await Promise.resolve(func(arr[i]));
await process(current++);
}
}
for (let i = 0; i < current; i++) {
workers.push(process(i));
}
await Promise.all(workers);
return results;
}

下面的 concurrent函数将返回一个  诺言,它解析为一系列已解析的承诺值,同时实现一个并发限制。没有第三方图书馆。

// waits 50 ms then resolves to the passed-in arg
const sleepAndResolve = s => new Promise(rs => setTimeout(()=>rs(s), 50))


// queue 100 promises
const funcs = []
for(let i=0; i<100; i++) funcs.push(()=>sleepAndResolve(i))


//run the promises with a max concurrency of 10
concurrent(10,funcs)
.then(console.log) // prints [0,1,2...,99]
.catch(()=>console.log("there was an error"))


/**
* Run concurrent promises with a maximum concurrency level
* @param concurrency The number of concurrently running promises
* @param funcs An array of functions that return promises
* @returns a promise that resolves to an array of the resolved values from the promises returned by funcs
*/
function concurrent(concurrency, funcs) {
return new Promise((resolve, reject) => {
let index = -1;
const p = [];
for (let i = 0; i < Math.max(1, Math.min(concurrency, funcs.length)); i++)
runPromise();
function runPromise() {
if (++index < funcs.length)
(p[p.length] = funcs[index]()).then(runPromise).catch(reject);
else if (index === funcs.length)
Promise.all(p).then(resolve).catch(reject);
}
});
}

如果你感兴趣,这里是打字稿的版本

/**
* Run concurrent promises with a maximum concurrency level
* @param concurrency The number of concurrently running promises
* @param funcs An array of functions that return promises
* @returns a promise that resolves to an array of the resolved values from the promises returned by funcs
*/
function concurrent<V>(concurrency:number, funcs:(()=>Promise<V>)[]):Promise<V[]> {
return new Promise((resolve,reject)=>{
let index = -1;
const p:Promise<V>[] = []
for(let i=0; i<Math.max(1,Math.min(concurrency, funcs.length)); i++) runPromise()
function runPromise() {
if (++index < funcs.length) (p[p.length] = funcs[index]()).then(runPromise).catch(reject)
else if (index === funcs.length) Promise.all(p).then(resolve).catch(reject)
}
})
}

这是我的食谱,基于 Killdash9的答案。 它允许选择异常的行为(Promise.all vs Promise.allSettled)。

// Given an array of async functions, runs them in parallel,
// with at most maxConcurrency simultaneous executions
// Except for that, behaves the same as Promise.all,
// unless allSettled is true, where it behaves as Promise.allSettled


function concurrentRun(maxConcurrency = 10, funcs = [], allSettled = false) {
if (funcs.length <= maxConcurrency) {
const ps = funcs.map(f => f());
return allSettled ? Promise.allSettled(ps) : Promise.all(ps);
}
return new Promise((resolve, reject) => {
let idx = -1;
const ps = new Array(funcs.length);
function nextPromise() {
idx += 1;
if (idx < funcs.length) {
(ps[idx] = funcs[idx]()).then(nextPromise).catch(allSettled ? nextPromise : reject);
} else if (idx === funcs.length) {
(allSettled ? Promise.allSettled(ps) : Promise.all(ps)).then(resolve).catch(reject);
}
}
for (let i = 0; i < maxConcurrency; i += 1) nextPromise();
});
}

我知道已经有很多答案了,但我最终使用的是一个非常简单的解决方案,不需要库或睡眠,只需要几个命令。All ()只是让您知道传递给它的所有承诺何时完成。因此,您可以间歇地检查队列,看它是否准备好进行更多的工作,如果准备好了,则添加更多的进程。

例如:

// init vars
const batchSize = 5
const calls = []
// loop through data and run processes
for (let [index, data] of [1,2,3].entries()) {
// pile on async processes
calls.push(doSomethingAsyncWithData(data))
// every 5th concurrent call, wait for them to finish before adding more
if (index % batchSize === 0) await Promise.all(calls)
}
// clean up for any data to process left over if smaller than batch size
const allFinishedProcs = await Promise.all(calls)