Fork/join 框架如何优于线程池?

使用新的 Fork/join 框架相对于在开始时简单地将大任务分割成 N 个子任务,将它们发送到一个缓存的线程池(来自 执行者)并等待每个任务完成,有什么好处呢?我没有看到使用 fork/join 抽象是如何简化问题的,或者是如何使解决方案比多年前更有效率的。

例如,教程示例中的并行模糊算法可以这样实现:

public class Blur implements Runnable {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;


private int mBlurWidth = 15; // Processing window size, should be odd.


public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
}


public void run() {
computeDirectly();
}


protected void computeDirectly() {
// As in the example, omitted for brevity
}
}

在开始时拆分并将任务发送到线程池:

// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool


int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();


// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
int size = Math.min(maxSize, src.length - i);
ForkBlur task = new ForkBlur(src, i, size, dst);
Future f = threadPool.submit(task);
futures.add(f);
}


// Wait for all sent tasks to complete:
for (Future future : futures) {
future.get();
}


// Done!

这些任务进入线程池的队列,在工作线程可用时从队列执行这些任务。只要拆分的粒度足够大(以避免特别等待最后一个任务) ,并且线程池有足够多的线程(至少有 N 个处理器) ,所有处理器都会全速运行,直到完成整个计算。

我是否遗漏了什么? 使用 fork/join 框架的附加值是什么?

57156 次浏览

如果有 n 个繁忙的线程全部100% 独立工作,那么这将比 Fork-Join (FJ)池中的 n 个线程要好。但事情从来都不是那样的。

可能无法精确地将问题分解为 n 个相等的部分。即使您这样做了,线程调度也离公平还有一段距离。你最终会等到最慢的线程。如果你有多个任务,那么他们可以每个运行少于 n 路并行性(通常更有效率) ,但上升到 n 路时,其他任务已经完成。

所以我们为什么不把这个问题切成 FJ 大小的碎片,然后用一个线程池来解决它呢。典型的 FJ 用法把问题切成小块。以随机顺序执行这些操作需要在硬件级别进行大量协调。管理费用太高了。在 FJ 中,任务被放置到一个队列中,线程按照后进先出顺序(LIFO/栈)读取,工作窃取(通常在核心工作中)按照先进先出(FIFO/“队列”)完成。其结果是,长数组处理可以在很大程度上按顺序进行,即使它被分成很小的块。(同样的情况是,在一次大爆炸中将问题分解成均匀大小的小块可能并不是微不足道的。比如说处理一种没有平衡的层次结构。)

结论: FJ 允许在不均匀的情况下更有效地使用硬件线程,如果您有多个线程,这种情况将始终存在。

Fork/join 不同于线程池,因为它实现了工作窃取

与任何 ExecutorService 一样,fork/join 框架分配任务 Fork/join 框架是 因为它使用了一个工作窃取算法 无事可做的线程可以从其他线程窃取任务 还是很忙。

假设您有两个线程和4个任务 a、 b、 c、 d,它们分别需要1、1、5和6秒。最初,a 和 b 分配给线程1,c 和 d 分配给线程2。在线程池中,这需要11秒。使用 fork/join,线程1完成并可以从线程2窃取工作,因此任务 d 最终将由线程1执行。线程1执行 a、 b 和 d,线程2只执行 c。整个时间: 8秒,而不是11秒。

编辑: 正如 Joonas 指出的,任务不一定是预先分配给线程的。Fork/join 的思想是,线程可以选择将任务分割成多个子块。因此,重申以下几点:

我们有两个任务(ab)和(cd) ,分别需要2秒和11秒。线程1开始执行 ab,并将其分成两个子任务 a & b。与线程2类似,它将分成两个子任务 c & d。当线程1完成 a & b 时,它可以从线程2中窃取 d。

我认为最基本的误解在于,Fork/Join 示例只显示工作 偷窃,而只显示某种标准的分而治之。

工作盗窃是这样的: 工人 B 已经完成了他的工作。他是一个善良的人,所以他环顾四周,看到工人 A 仍然非常努力地工作。他走过去问: “嘿,小伙子,我可以帮你一把。”答复。“酷,我有1000个单元的任务。到目前为止,我已经完成了345离开655。你能不能帮我从673号到1000号,我从346号到672号。”B 说: “好的,我们开始吧,这样我们可以早点去酒吧。”

你看-工人们必须相互沟通,即使他们开始真正的工作。这是示例中缺少的部分。

另一方面,示例只显示了类似于“使用分包商”的内容:

工人 A: “该死,我有1000个单位的工作量。对我来说太多了。我会自己做500,然后把500转包给别人。”这个过程一直持续到大任务被分解成每个10个单元的小包。这些将由可用的工人执行。但如果一包毒药比其他包需要更长的时间运气不好,分裂阶段就结束了。

Fork/Join 和预先分割任务之间唯一的区别是: 当预先分割任务时,从一开始工作队列就已经满了。例如: 1000个单位,阈值是10,因此队列有100个条目。这些数据包被分发给线程池成员。

Fork/Join 更为复杂,它试图使队列中的数据包数量更小:

  • 步骤1: 将一个包含(1... 1000)的数据包放入队列
  • 步骤2: 一个工作人员弹出数据包(1... 1000)并用两个数据包替换它: (1... 500)和(501... 1000)。
  • 步骤3: 一个工作者弹出数据包(500... 1000)并推(500... 750)和(751... 1000)。
  • 步骤 n: 堆栈包含这些数据包: (1. . 500)、(500... 750)、(750... 875) ... (991. . 1000)
  • 步骤 n + 1: 弹出并执行数据包(991. .1000)
  • 步骤 n + 2: 弹出并执行数据包(981. .990)
  • 步骤 n + 3: 将数据包(961. . 980)弹出并拆分为(961. . 970)和(971. . 980)。 ....

您可以看到: 在 Fork/Join 中,队列更小(示例中为6) ,并且“拆分”和“工作”阶段是交叉的。

当多个工人同时挤压和推挤时,交互作用当然就不那么清晰了。

以上所说的利益都是正确的,都是通过偷工减料来实现的,但是要详细说明这是为什么。

主要的好处是工作线程之间的有效协调。这项工作必须分开和重新组合,这需要协调。正如你在 A.H 的答案上面看到的,每个线程都有自己的工作列表。此列表的一个重要属性是排序(顶部为大型任务,底部为小型任务)。每个线程执行其列表底部的任务,并从其他线程列表的顶部窃取任务。

结果是:

  • 任务列表的头部和尾部可以独立地进行同步,从而减少列表上的争用。
  • 工作的重要子树由同一个线程拆分和重新组装,因此这些子树不需要线程间的协调。
  • 当一个线程窃取工作,它需要一个大的部分,然后再细分到自己的名单
  • 工作钢意味着螺纹几乎被充分利用,直到工序结束。

大多数其他使用线程池的分治方案需要更多的线程间通信和协调。

另一个重要的区别似乎是,使用 F-J,您可以进行多个复杂的“加入”阶段。考虑从 http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html的合并排序,将有太多的编排需要预分割这项工作。例如:。你需要做以下事情:

  • 整理第一季度
  • 整理第二季度
  • 合并前两节
  • 排序第三季度
  • 把第四季度分类
  • 合并最后两个季度
  • 合并两半

如何指定必须在与它们相关的合并之前进行排序等等。

我一直在寻找如何最好地为每一个项目清单做某件事情。我想我只需要预先分割列表并使用标准的 ThreadPool。当工作不能被预先分割成足够独立的任务,但可以被递归地分割成彼此独立的任务时,F-J 看起来最有用(例如,排序的一半是独立的,但合并两个排序的一半成为一个排序的整体却不是)。

当你有昂贵的合并操作时,F/J 也有一个明显的优势。因为它分裂成一个树结构,所以只需要 log2(n) merge,而不需要 n 个线性线程分裂的 merge。(这确实使得理论上假设你有和线程一样多的处理器,但仍然是一个优势)对于一个家庭作业,我们必须合并数千个2D 数组(所有相同的维度)通过总和的值在每个索引。使用 fork join 和 P 处理器,当 P 接近无穷大时,时间接近 log2(n)。

123... 731... 854
456 + 243 = > 699
789... 110... 899

在本例中,Fork/Join 没有添加任何值,因为不需要分叉,工作负载在工作线程之间均匀分配。分叉/联接只会增加开销。

这是关于这个话题的 文章不错台词:

总的来说,我们可以说 ThreadPoolExecator 是首选的 其中工作负载均匀地分布在工作线程之间 为了保证这一点,您确实需要准确地知道输入数据是什么 相比之下,ForkJoinPool 提供了良好的性能 无论输入数据如何,因此是一个明显更健全的 解决方案。

你会惊讶于 ForkJoin 在应用程序中像爬虫一样的性能。 这是你能学到的最好的 教程

Fork/Join 的逻辑非常简单: (1)分离(Fork)每个大型任务 分解成较小的任务; (2)在单独的线程中处理每个任务 (如有需要,把这些工作分成更小的工作) ; 结果。

如果问题是这样的,我们必须等待其他线程完成(例如对数组或数组和进行排序) ,那么应该使用 fork 连接,因为 Execator (Executors.newFixedThreadPool (2))会因为线程数量有限而阻塞。 在这种情况下,forkjoin 池将创建更多的线程来掩盖被阻塞的线程,以保持相同的并行性

译自: 美国《科学》杂志网站(http://www.oracle.com/technetwork/article/java/fork-join-422606.html)原文链接: http://www.oracle.com/technetwork/article/java/fork-join-422606.html/rel = “ nofollow”http://www.oracle.com/technetwork/articles/java/fork-join-422606.html

实现分治算法的执行器的问题与创建子任务无关,因为 Callable 可以自由地向其执行器提交新的子任务,并以同步或异步方式等待结果。问题在于并行性: 当一个 Callable 等待另一个 Callable 的结果时,它将处于等待状态,从而浪费了处理排队等待执行的另一个 Callable 的机会。

通过 Doug Lea 的努力,将 fork/join 框架添加到 Java SE 7中的 Java.util.concurlpackage 中,从而填补了这一空白

来源: http://strong > < a href = “ https://docs.oracle.com/javase/7/docs/api/java/util/while/ForkJoinPool.html”rel = “ nofollow”> https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/forkjoinpool.html

该池通过动态添加、挂起或恢复内部工作线程来试图维护足够的活动(或可用)线程,即使某些任务在等待加入其他任务时停滞不前。但是,面对阻塞的 IO 或其他非托管同步时,无法保证进行这样的调整

Public int getPoolSize () 返回已启动但尚未终止的工作线程数。这个方法返回的结果可能不同于 getParallelism () ,当其他线程被合作阻塞时,创建线程是为了维护并行性。

线程池和 Fork/Join 的最终目标是相似的: 两者都希望最大限度地利用可用的 CPU 能量以实现最大吞吐量。最大吞吐量意味着应该在较长的时间内完成尽可能多的任务。要做到这一点需要什么?(对于下面的内容,我们假设并不缺少计算任务: 总是有足够的事情可以做,以达到100% 的 CPU 利用率。另外,在超线程的情况下,我使用“ CPU”等效于内核或虚拟内核)。

  1. 至少需要有尽可能多的线程在运行,因为运行较少的线程会使核心不能使用。
  2. 在最大限度上,运行的线程数量必须和可用 CPU 数量一样多,因为运行更多的线程会给调度程序带来额外的负载,调度程序会将 CPU 分配给不同的线程,这会导致一些 CPU 时间流向调度程序,而不是我们的计算任务。

因此,我们发现,为了获得最大的吞吐量,我们需要拥有与 CPU 完全相同的线程数量。在 Oracle 的模糊示例中,您可以选择一个固定大小的线程池,其线程数量等于可用 CPU 的数量,也可以使用一个线程池。没用的,你说得对!

那么线程池什么时候会出问题呢?也就是说,如果一个线程阻塞 ,因为您的线程正在等待另一个任务完成。假设以下例子:

class AbcAlgorithm implements Runnable {
public void run() {
Future<StepAResult> aFuture = threadPool.submit(new ATask());
StepBResult bResult = stepB();
StepAResult aResult = aFuture.get();
stepC(aResult, bResult);
}
}

我们在这里看到的是一个由三个步骤 A、 B 和 C 组成的算法。A 和 B 可以彼此独立执行,但是步骤 C 需要步骤 A 和 B 的结果。这个算法所做的是将任务 A 提交到线程池并直接执行任务 b。之后,线程将等待任务 A 完成,并继续执行步骤 C。如果 A 和 B 是同时完成的,那么一切都很好。但如果 A 花的时间比 B 长呢?这可能是因为任务 A 的性质决定了它,但也可能是因为没有 任务 A 的线程在开始时可用,任务 A 需要等待。(如果只有一个 CPU 可用,因此线程池只有一个线程,这甚至会导致死锁,但目前这还不是重点)。重点是刚刚执行任务 B 阻碍了整个线程的线程。因为我们有相同数量的线程作为 CPU 和一个线程被阻塞,这意味着 一个 CPU 空闲

Fork/Join 解决了这个问题: 在 Fork/Join 框架中,你可以编写如下相同的算法:

class AbcAlgorithm implements Runnable {
public void run() {
ATask aTask = new ATask());
aTask.fork();
StepBResult bResult = stepB();
StepAResult aResult = aTask.join();
stepC(aResult, bResult);
}
}

看起来一样,不是吗?然而线索是 aTask.join 不会阻止。相反,这里是 偷工作发挥作用的地方: 该线程将寻找过去分叉的其他任务,并将继续这些任务。首先,它检查自己分叉的任务是否已经开始处理。因此,如果 A 还没有被另一个线程启动,它将接着执行 A,否则它将检查其他线程的队列并窃取它们的工作。一旦另一个线程的另一个任务完成,它将检查 A 现在是否已经完成。如果是上面的算法可以调用 stepC。否则,它将寻找另一个任务来窃取。这就是 Fork/join 池可以实现100% 的 CPU 利用率,即使在遇到阻塞操作时也是如此

然而,这里有一个陷阱: 工作盗窃只有在 ForkJoinTaskjoin呼叫时才可能发生。不能对外部阻塞操作(如等待另一个线程或等待 I/O 操作)执行此操作。那么,等待 I/O 完成是一个常见的任务,这又是怎么回事呢?在这种情况下,如果我们可以添加一个额外的线程到 Fork/Join 池,它将在阻塞操作完成后立即停止,这将是第二个最好的选择。如果我们使用 ManagedBlockerForkJoinPool实际上可以做到这一点。

斐波那契数

用于递归任务的 JavaDoc中有一个使用 Fork/Join 计算斐波那契数的例子:

public static int fib(int n) {
if (n <= 1) {
return n;
}
return fib(n - 1) + fib(n - 2);
}

正如在 JavaDocs 中解释的那样,这是计算 fibonacci 数的一种相当垃圾的方法,因为这种算法具有 O (2 ^ n)的复杂性,而简单的方法是可能的。然而,这个算法非常简单,容易理解,所以我们坚持使用它。假设我们希望使用 Fork/Join 来加快速度。一个幼稚的实现应该是这样的:

class Fibonacci extends RecursiveTask<Long> {
private final long n;


Fibonacci(long n) {
this.n = n;
}


public Long compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}

Task 分解的步骤太短了,因此执行起来会很糟糕,但是您可以看到框架通常是如何工作得非常好的: 两个汇总可以单独计算,但是我们需要两个汇总来构建最终结果。所以一半是在另一个线程中完成的。对线程池进行同样的操作,而不会出现死锁(可能,但远没有那么简单)。

为了完整起见: 如果你真的想用这种递归方法计算斐波那契数,这里有一个优化版本:

class FibonacciBigSubtasks extends RecursiveTask<Long> {
private final long n;


FibonacciBigSubtasks(long n) {
this.n = n;
}


public Long compute() {
return fib(n);
}


private long fib(long n) {
if (n <= 1) {
return 1;
}
if (n > 10 && getSurplusQueuedTaskCount() < 2) {
final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
f1.fork();
return f2.compute() + f1.join();
} else {
return fib(n - 1) + fib(n - 2);
}
}
}

这使得子任务变得更小,因为它们只有在 n > 10 && getSurplusQueuedTaskCount() < 2为 true 时才会被拆分,这意味着有100多个方法调用要做(n > 10) ,并且没有非常多的 man 任务已经在等待(getSurplusQueuedTaskCount() < 2)。

在我的计算机上(4核(计算超线程时为8核) ,Intel (R) Core (TM) i7-2720QM CPU@2.20 GHz) ,使用传统方法需要64秒,使用 Fork/Join 方法只需要18秒,这是一个相当明显的增益,尽管在理论上可能性不大。

摘要

  • 是的,在您的示例中,Fork/Join 与传统的线程池相比没有优势。
  • 当涉及到阻塞时,Fork/Join 可以极大地提高性能
  • Fork/Join 避免了一些死锁问题

我想为那些没有太多时间阅读长答案的人补充一个简短的答案。这个比较来自《应用阿卡模式:

您决定是使用 fork-join-execute 还是使用 线程池执行器在很大程度上取决于 这个调度程序将被阻塞 活动线程的最大数量,而线程池执行器给出 一个固定数量的线程。如果线程被阻塞,则 Fork-join-Execator 将创建更多内容,而线程池执行器将创建更多内容 对于阻塞操作,通常最好使用 线程池执行器,因为它防止您的线程计数从 爆炸。更多的“反应性”操作在一个 叉连接执行器叉连接执行器。