如何在 Javascript 中并行运行异步/等待

最后,除了 IE 浏览器,所有主流浏览器的 async/await都将是 支持。 所以现在我们可以开始使用 async/await编写更具可读性的代码,但是有一个问题。很多人使用异步都是这样等待的:

const userResponse = await fetchUserAsync();
const postsResponse = await fetchPostsAsync();

虽然这段代码是可读的,但是它有一个问题,它以串行方式运行函数,直到完成对用户的提取,它才会开始提取文章。解决方案很简单,我们需要并行地获取资源。

所以我想做的是(用伪语言) :

fn task() {
result-1 = doAsync();
result-2 = doAsync();
result-n = doLongAsync();


// handle results together
combinedResult = handleResults(result-1, result-2);


lastResult = handleLastResult(result-n);
}
47435 次浏览

你可以这样写:

const responses = await Promise.all([
fetchUserAsync(),
fetchPostsAsync(),
]);


const userResponse = responses[0];
const postsResponse = responses[1];

这很简单,对吧?但有一个问题。Promise.all快速失效行为快速失效行为,这意味着,它会拒绝尽快其中一个承诺被拒绝。您可能需要一个更加健壮的解决方案,其中我们负责处理任何取出的拒绝。幸运的是有一个解决方案,只需使用 async/await就可以实现,而不需要使用 Promise.all。一个实例:

console.clear();


function wait(ms, data) {
return new Promise( resolve => setTimeout(resolve.bind(this, data), ms) );
}


/**
* This will run in series, because
* we call a function and immediately wait for it's result,
* so this will finish in 1s.
*/
async function series() {
return {
result1: await wait(500, 'seriesTask1'),
result2: await wait(500, 'seriesTask2'),
}
}


/**
* While here we call the functions first,
* then wait for the result later, so
* this will finish in 500ms.
*/
async function parallel() {
const task1 = wait(500, 'parallelTask1');
const task2 = wait(500, 'parallelTask2');


return {
result1: await task1,
result2: await task2,
}
}


async function taskRunner(fn, label) {
const startTime = performance.now();
console.log(`Task ${label} starting...`);
let result = await fn();
console.log(`Task ${label} finished in ${ Number.parseInt(performance.now() - startTime) } miliseconds with,`, result);
}


void taskRunner(series, 'series');
void taskRunner(parallel, 'parallel');




/*
* The result will be:
* Task series starting...
* Task parallel starting...
* Task parallel finished in 500 milliseconds with, { "result1": "parallelTask1", "result2": "parallelTask2" }
* Task series finished in 1001 milliseconds with, { "result1": "seriesTask1", "result2": "seriesTask2" }
*/

注意: 您将需要一个具有 async/await 启用的浏览器来运行这个代码片段(或 nodejs v7及以上版本)

这样,您可以简单地使用 try/catch来处理错误,并在 parallel函数中返回部分结果。

我刚刚也做了同样的事。通过使用承诺,然后在最后同步它们,您可以执行许多并发请求,但是要确保在完成之前返回所有结果。

请看最后一个例子: Http://javascriptrambling.blogspot.com/2017/04/to-promised-land-with-asyncawait-and.html

如果可以接受 Prose.all 的快速失败行为和解构赋值语法:

const [userResponse, postsResponse] = await Promise.all([
fetchUserAsync(),
fetchPostsAsync(),
]);

伪代码可以编写如下:

fn async task() {
result-1 = doAsync();
result-2 = doAsync();
result-n = doLongAsync();
try{
// handle results together
combinedResult = handleResults(await result-1, await result-2);
lastResult = handleLastResult(await result-n);
}
catch(err){
console.error(err)
}


}

Result-1、 result-2、 result-n 将并行运行。 BinedResult 和 lastResult 也将并行运行。 但是,一旦 result-1和 result-2可用,则返回组合结果值(即 handleResults 函数的返回值) ,而一旦 result-n 可用,则返回 lastResult 值(即 handleLastResult)。

希望这个能帮上忙

首先,您的代码是阻塞代码吗?

如果是,请记住 javascript 是单线程的,因此不能同时运行两个同步代码,例如两个循环(for 或 while)。

但是,对于 使用网络工作者来实现这一点是可能的,我设法在通用 web worker 中执行函数,而不使用分离的 js 文件。

setInterval(()=>{console.log("non blocked " + Math.random())}, 900)


console.log("start blocking code in parallel in web Worker")
console.time("blocked")


genericWorker(window, ["blockCpu", function (block){
block(10000) //This blockCpu function is defined below
return "\n\nbla bla\n" //This is catched in the resolved promise


}]).then(function (result){
console.timeEnd("blocked")
console.log("End of blocking code", result)
})
.catch(function(error) { console.log(error) })




/*  A Web Worker that does not use a File, it create that from a Blob
@cb_context, The context where the callback functions arguments are, ex: window
@cb, ["fn_name1", "fn_name2", function (fn1, fn2) {}]
The callback will be executed, and you can pass other functions to that cb
*/
function genericWorker(cb_context, cb) {
return new Promise(function (resolve, reject) {


if (!cb || !Array.isArray(cb))
return reject("Invalid data")


var callback = cb.pop()
var functions = cb


if (typeof callback != "function" || functions.some((fn)=>{return typeof cb_context[fn] != "function"}))
return reject(`The callback or some of the parameters: (${functions.toString()}) are not functions`)


if (functions.length>0 && !cb_context)
return reject("context is undefined")


callback = fn_string(callback) //Callback to be executed
functions = functions.map((fn_name)=> { return fn_string( cb_context[fn_name] ) })


var worker_file = window.URL.createObjectURL( new Blob(["self.addEventListener('message', function(e) { var bb = {}; var args = []; for (fn of e.data.functions) { bb[fn.name] = new Function(fn.args, fn.body); args.push(fn.name)}; var callback = new Function( e.data.callback.args, e.data.callback.body); args = args.map(function(fn_name) { return bb[fn_name] });  var result = callback.apply(null, args) ;self.postMessage( result );}, false)"]) )
var worker = new Worker(worker_file)


worker.postMessage({ callback: callback, functions: functions })


worker.addEventListener('error', function(error){ return reject(error.message) })


worker.addEventListener('message', function(e) {
resolve(e.data), worker.terminate()
}, false)


//From function to string, with its name, arguments and its body
function fn_string (fn) {
var name = fn.name, fn = fn.toString()


return { name: name,
args: fn.substring(fn.indexOf("(") + 1, fn.indexOf(")")),
body: fn.substring(fn.indexOf("{") + 1, fn.lastIndexOf("}"))
}
}
})
}


//random blocking function
function blockCpu(ms) {
var now = new Date().getTime(), result = 0
while(true) {
result += Math.random() * Math.random();
if (new Date().getTime() > now +ms)
return;
}
}

对于那些询问如何将其扩展到运行时确定的调用数量的人,可以使用2个循环。第一个开始所有的任务,第二个等待一切完成

console.clear();


function wait(ms, data) {
return new Promise( resolve => setTimeout(resolve.bind(this, data), ms) );
}


/**
* While here we call the functions first,
* then wait for the result later, so
* this will finish in 500ms.
*/
async function runTasks(timings) {
let tasks = [];
for (let i in timings) {
tasks.push(wait(timings[i], `Result of task ${i}`));
}


/* Want fast fail? use Promise.All */
//return Promise.All(tasks);
  

let results = [];
for (let task of tasks) {
results.push(await task);
}


return results;
}


async function taskRunner(fn, arg, label) {
const startTime = performance.now();
console.log(`Task ${label} starting...`);
let result = await fn(arg);
console.log(`Task ${label} finished in ${ Number.parseInt(performance.now() - startTime) } miliseconds with,`, result);
}


void taskRunner(runTasks, [50,100,200,60,500], 'Task List');

选择的答案提出了两种方法,即等待所有“衍生”异步函数的终止。

我的建议是使用 setImmediate(nodejs ~ 相当于 setTimeout (0))来产生每个异步函数,以便在所有函数完成之前运行每个函数并获得中间结果:

for (let i = 0; i < numSpawns; i++ ) {


// nodejs
setImmediate( async () => { console.log( await runAsyncFunction(msecsMax) ) } )


// browser
// substitute setImmediate with setTimeout( await runAsyncFunction, 0, msecsmax )


}

完整的演示代码

/**
* parallel.js
* demo, to "spawn" in parallel multiple async functions
*/


/**
* sleep
* warp setTimeout, returning a value
*
* @async
* @param {Number}  msecs number of milliseconds
* @return {Number} msecs
*/
function sleep(msecs) {
return new Promise(function(resolve /*, reject*/) {
setTimeout( () => { resolve(msecs) }, msecs )
})
}


/**
* randomInteger
* Returns a random integer number between min (inclusive) and max (inclusive)
* @param {Number}  min
* @return {Number} max
*/
function randomInteger(min, max) {
return Math.floor(Math.random() * (max - min + 1)) + min;
}




/**
* runAsyncFunction
* simulate an async function,
* returning, after a random number of msecs, the number of msecs
*
* @async
* @param {Number}  msecsMax max duration in milliseconds
* @return {Number} random number of msecs
*/
async function runAsyncFunction(msecsMax) {
const msecsMin = 500
return await sleep( randomInteger(msecsMin, msecsMax) )
}






async function parallel(numSpawns, msecsMax) {
for (let i = 0; i < numSpawns; i++ ) {


// nodejs
setImmediate( async () => { console.log( await runAsyncFunction(msecsMax) ) } )


// browser
// substitute setImmediate with setTimeout( await runAsyncFunction, 0, msecsmax )
  

}
}




async function main() {


const msecsMax = 3000
const numSpawns = 10
  

// runs in "parallel" 10 async functions,
// each one returning after a sleep of a random number of milliseconds (between 500 to 3000)
parallel(numSpawns, msecsMax)
}


main()

运行程序:

$ /usr/bin/time --verbose node parallel
1204
1869
1983
2042
2119
2220
2222
2611
2642
2660
Command being timed: "node parallel"
User time (seconds): 0.07
System time (seconds): 0.00
Percent of CPU this job got: 3%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:02.72
Average shared text size (kbytes): 0
Average unshared data size (kbytes): 0
Average stack size (kbytes): 0
Average total size (kbytes): 0
Maximum resident set size (kbytes): 31568
Average resident set size (kbytes): 0
Major (requiring I/O) page faults: 0
Minor (reclaiming a frame) page faults: 2160
Voluntary context switches: 39
Involuntary context switches: 1
Swaps: 0
File system inputs: 0
File system outputs: 0
Socket messages sent: 0
Socket messages received: 0
Signals delivered: 0
Page size (bytes): 4096
Exit status: 0