协调 node.js 中的并行执行

Js 的事件驱动程式设计模型使得协调程序流有些棘手。

简单的顺序执行转换为嵌套的回调,这非常容易(尽管写下来有点复杂)。

但是并行执行呢?假设您有三个任务 A、 B、 C,它们可以并行运行,当它们完成时,您希望将它们的结果发送到任务 D。

如果使用 fork/join 模型,这将是

  • 叉子 A
  • 叉子 B
  • 叉子 C
  • 加入 A,B,C,运行 D

如何在 node.js 中编写它?有什么最佳实践或食谱吗?我必须每次 手卷解决方案,还是有一些图书馆的帮助?

37478 次浏览

这里可能有一个简单的解决方案: http://howtonode.org/control-flow-part-ii滚动到并行操作。另一种方法是让 A、 B 和 C 共享同一个回调函数,让这个函数有一个全局的或者至少是函数外的递增器,如果这三个函数都调用了回调函数,那么让它运行 D,当然你也必须把 A、 B 和 C 的结果存储在某个地方。

Js 中没有什么是真正并行的,因为它是单线程的。但是,可以安排多个事件并按照无法事先确定的顺序运行。而且像数据库访问这样的事情实际上是“并行的”,因为数据库查询本身在不同的线程中运行,但是在完成时会重新集成到事件流中。

那么,如何安排对多个事件处理程序的回调呢?这是浏览器端 javascript 动画中常用的一种技术: 使用一个变量来跟踪完成过程。

这听起来像一个黑客,它是,它听起来潜在的混乱留下一堆全局变量周围做跟踪和在一个较小的语言它会。但是在 javascript 中我们可以使用闭包:

function fork (async_calls, shared_callback) {
var counter = async_calls.length;
var callback = function () {
counter --;
if (counter == 0) {
shared_callback()
}
}


for (var i=0;i<async_calls.length;i++) {
async_calls[i](callback);
}
}


// usage:
fork([A,B,C],D);

在上面的示例中,我们假设异步函数和回调函数不需要参数,从而使代码保持简单。当然,您可以修改代码,将参数传递给异步函数,并让回调函数累积结果,然后将其传递给 share _ callback 函数。


补充答案:

实际上,即使是这样,fork()函数也已经可以使用闭包将参数传递给异步函数:

fork([
function(callback){ A(1,2,callback) },
function(callback){ B(1,callback) },
function(callback){ C(1,2,callback) }
],D);

剩下要做的唯一一件事就是积累 A、 B、 C 的结果,然后把它们传递给 D。


更多的补充答案是:

我忍不住。早餐的时候我一直在想这件事。下面是一个累积结果的 fork()实现(通常作为参数传递给回调函数) :

function fork (async_calls, shared_callback) {
var counter = async_calls.length;
var all_results = [];
function makeCallback (index) {
return function () {
counter --;
var results = [];
// we use the arguments object here because some callbacks
// in Node pass in multiple arguments as result.
for (var i=0;i<arguments.length;i++) {
results.push(arguments[i]);
}
all_results[index] = results;
if (counter == 0) {
shared_callback(all_results);
}
}
}


for (var i=0;i<async_calls.length;i++) {
async_calls[i](makeCallback(i));
}
}

这很简单,这使得 fork()具有相当的通用性,可以用来同步多个非同质事件。

在 Node.js 中的使用示例:

// Read 3 files in parallel and process them together:


function A (c){ fs.readFile('file1',c) };
function B (c){ fs.readFile('file2',c) };
function C (c){ fs.readFile('file3',c) };
function D (result) {
file1data = result[0][1];
file2data = result[1][1];
file3data = result[2][1];


// process the files together here
}


fork([A,B,C],D);

更新

这些代码是在诸如 sync.js 或各种基于承诺的库存在之前编写的。我很愿意相信这个异步 c.js 是受到这个启发的,但是我没有任何证据证明它。不管怎样。.如果您今天正在考虑这样做,那么可以看一下 sync.js 或者承诺。只要考虑一下上面的答案,就可以很好地解释或说明类似的事情是如何工作的。

为了完整起见,下面是使用 async.parallel的方法:

var async = require('async');


async.parallel([A,B,C],D);

请注意,async.parallel的工作原理与我们上面实现的 fork函数完全相同。主要的区别在于,按照 node.js 约定,它将错误作为第一个参数传递给 D,并将回调作为第二个参数传递。

使用承诺,我们会这样写:

// Assuming A, B & C return a promise instead of accepting a callback


Promise.all([A,B,C]).then(D);

另一个选项可以是 Node: https://github.com/creationix/step的 Step 模块

我相信现在的“异步”模块提供了这种并行功能,与上面的 fork 函数大致相同。

未来模块有一个名为 加入的子模块,我喜欢使用这个子模块:

将异步调用连接在一起,类似于 pthread_join对线程的工作方式。

自述文件显示了一些使用它的自由式或者使用使用臭氧保证模式的 未来子模块的好例子。来自文档的例子:

var Join = require('join')
, join = Join()
, callbackA = join.add()
, callbackB = join.add()
, callbackC = join.add();


function abcComplete(aArgs, bArgs, cArgs) {
console.log(aArgs[1] + bArgs[1] + cArgs[1]);
}


setTimeout(function () {
callbackA(null, 'Hello');
}, 300);


setTimeout(function () {
callbackB(null, 'World');
}, 500);


setTimeout(function () {
callbackC(null, '!');
}, 400);


// this must be called after all
join.when(abcComplete);

您可能想尝试这个小库: https://www.npmjs.com/package/parallel-io

除了流行的承诺和异步库之外,还有第三种优雅的方式——使用“连接”:

var l = new Wire();


funcA(l.branch('post'));
funcB(l.branch('comments'));
funcC(l.branch('links'));


l.success(function(results) {
// result will be object with results:
// { post: ..., comments: ..., links: ...}
});

Https://github.com/garmoshka-mo/mo-wire