Java 8并行流中的自定义线程池

是否可以为Java 8 平行流指定一个自定义线程池?我到处都找不到。

假设我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大,而且是多线程的,所以我想对它进行划分。我不希望在来自另一个模块的applicationblock任务的一个模块中运行缓慢的任务。

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数实际情况下安全地使用并行流。

试试下面的例子。有一些CPU密集型任务在单独的线程中执行。 任务利用并行流。第一个任务中断,因此每一步花费1秒(通过线程睡眠模拟)。问题是其他线程卡住,等待中断的任务完成。这是一个虚构的例子,但是想象一下servlet应用程序和某人向共享fork连接池提交了一个长时间运行的任务。< / p >

public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();


es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));




es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}


private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}


public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
253484 次浏览

并行流使用默认的ForkJoinPool.commonPool,即默认情况下,当您有处理器时,线程会减少一个,由Runtime.getRuntime().availableProcessors()返回(这意味着并行流为调用线程留下一个处理器)。

对于需要单独或自定义池的应用程序,ForkJoinPool可以用给定的目标并行度级别来构造;默认情况下,等于可用处理器的数量。

这也意味着如果你有嵌套并行流或多个并行流并发启动,它们都将分享相同的池。优点:使用的处理器数量永远不会超过默认值(可用处理器数量)。缺点:你可能得不到“所有的处理器”;分配给您发起的每个并行流(如果您碰巧有多个并行流)。(显然你可以使用ManagedBlocker来规避这一点。)

要更改并行流的执行方式,您可以使用以下两种方法

  • 提交并行流执行到你自己的ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
  • 你可以使用系统属性:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")来改变公共池的大小,目标并行度为20个线程。

后者的例子在我的机器上有8个处理器。如果我运行以下程序:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ");
});

输出结果为:

215 216 216 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416 416

所以你可以看到并行流一次处理8个项目,也就是说它使用8个线程。然而,如果我取消注释注释行,输出是:

215 215 215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

这一次,并行流使用了20个线程,流中的所有20个元素都被并发处理。

实际上,在特定的fork-join池中执行并行操作是有技巧的。如果您将其作为fork-join池中的任务执行,则它将停留在那里,而不使用公共池。

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}

该技巧基于ForkJoinTask.fork,它指定:“安排在当前任务正在运行的池中异步执行此任务,如果适用,或者如果不使用__abc2,则使用ForkJoinPool.commonPool()”;

除了在你自己的forkJoinPool中触发并行计算之外,你还可以将这个forkJoinPool传递给CompletableFuture。像下面这样的async方法:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
//parallel task here, for example
range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()),
forkJoinPool
);

要测量实际使用的线程数,可以检查Thread.activeCount():

    Runnable r = () -> IntStream
.range(-42, +42)
.parallel()
.map(i -> Thread.activeCount())
.max()
.ifPresent(System.out::println);


ForkJoinPool.commonPool().submit(r).join();
new ForkJoinPool(42).submit(r).join();

这可以在4核CPU上产生如下输出:

5 // common pool
23 // custom pool

如果没有.parallel(),它会给出:

3 // common pool
4 // custom pool

到目前为止,我使用了这个问题的答案中描述的解决方案。现在,我提出了一个名为并行流支持的小库:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
.filter(PrimesPrint::isPrime)
.collect(toList())

但是正如@PabloMatiasGomez在评论中指出的那样,并行流的分割机制存在缺陷,它严重依赖于公共池的大小。参见来自HashSet的并行流不会并行运行

我使用这个解决方案只是为了对不同类型的工作有单独的池,但即使我不使用它,我也不能将公共池的大小设置为1。

原来的解决方案(设置ForkJoinPool公共并行性属性)不再有效。看看原始答案中的链接,打破这一点的更新已经被回移植到Java 8。正如链接线程中提到的,这个解决方案并不能保证永远有效。基于此,解决方案是forkjoinpool。提交接受答案中讨论的.get解决方案。我认为后端口修复了这个解决方案的不可靠性。

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
.forEach((int theInt) ->
{
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.println(Thread.currentThread().getName() + " -- " + theInt);
})).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
.forEach((theInt) ->
{
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.println(Thread.currentThread().getName() + " -- " + theInt);
})).get();

去获取abacus-common。并行流可指定线程数。下面是示例代码:

LongStream.range(4, 1_000_000).parallel(threadNum)...

披露:我是abacus-common的开发者。

如果你不介意使用第三方库,使用cyclops-react你可以在同一个管道中混合顺序流和并行流,并提供自定义ForkJoinPools。例如

 ReactiveSeq.range(1, 1_000_000)
.foldParallel(new ForkJoinPool(10),
s->s.filter(i->true)
.peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
.max(Comparator.naturalOrder()));

或者希望继续在顺序流中处理

 ReactiveSeq.range(1, 1_000_000)
.parallel(new ForkJoinPool(10),
s->s.filter(i->true)
.peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
.map(this::processSequentially)
.forEach(System.out::println);

[披露我是cyclops-react的主要开发者]

我尝试了自定义 ForkJoinPool,如下所示来调整池的大小:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
return () -> aList.parallelStream()
.peek((i) -> {
String threadName = Thread.currentThread().getName();
ThreadNameSet.add(threadName);
})
.reduce(0L, Long::sum);
}


private static void testForkJoinPool() {
final int parallelism = 10;


ForkJoinPool forkJoinPool = null;
Long result = 0L;
try {
forkJoinPool = new ForkJoinPool(parallelism);
result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call


} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown(); //always remember to shutdown the pool
}
}
out.println(result);
out.println(ThreadNameSet);
}

这里的输出显示池使用的线程比默认的4更多。

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

但实际上存在怪人,当我试图使用ThreadPoolExecutor实现相同的结果时,如下所示:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

但我失败了。

它只会在一个新线程中启动parallelStream,然后其他一切都是一样的,这再一次证明了parallelStream将使用的ForkJoinPool来启动它的子线程。

<强>注意: JDK 10中似乎实现了一个修复,以确保自定义线程池使用预期的线程数 自定义ForkJoinPool中的并行流执行应该遵守并行性 https://bugs.openjdk.java.net/browse/JDK-8190974 < / p >

如果你不需要自定义线程池,但你想要限制并发任务的数量,你可以使用:

List<Path> paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList());
List<List<Path>> partitions = Lists.partition(paths, 4); // Guava method


partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> {
// do your processing
}));

(重复的问题被锁定了,所以请原谅我在这里)

如果你不想依赖于实现技巧,总有一种方法可以通过实现将mapcollect语义结合在一起的自定义收集器来实现同样的目标……并且你不会局限于ForkJoinPool:

list.stream()
.collect(parallel(i -> process(i), executor, 4))
.join()

幸运的是,它已经在这里完成,并在Maven Central上可用: http://github.com/pivovarit/parallel-collectors < / p >

免责声明:是我写的,并为此负责。

我们可以使用以下属性更改默认的并行度:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

可以设置为使用更多的并行性。

下面是我如何通过编程方式设置上面提到的最大线程数标志,以及一段代码来验证该参数是否符合要求

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
Set<String> threadNames = Stream.iterate(0, n -> n + 1)
.parallel()
.limit(100000)
.map(i -> Thread.currentThread().getName())
.collect(Collectors.toSet());
System.out.println(threadNames);


// Output -> [ForkJoinPool.commonPool-worker-1, Test worker, ForkJoinPool.commonPool-worker-3]

我使实用工具方法并行运行任务与参数定义最大线程数。

public static void runParallel(final int maxThreads, Runnable task) throws RuntimeException {
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(maxThreads);
forkJoinPool.submit(task).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}

它创建具有最大允许线程数的ForkJoinPool,并在任务完成(或失败)后关闭它。

用法如下:

final int maxThreads = 4;
runParallel(maxThreads, () ->
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList()));

(目前)公认的答案有一部分是错误的。只需要将并行流submit()到专用的fork-join池就足够了。在这种情况下,流将使用该池的线程和另外公共fork-join-pool,甚至调用线程来处理流的工作负载,这似乎取决于公共fork-join池的大小。这种行为有点奇怪,但绝对不是必需的。

要实际将工作完全限制到专用池,必须将其封装到CompletableFuture中:

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = CompletableFuture.supplyAsync(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList()),
forkJoinPool)  // <- passes dedicated fork-join pool as executor
.join();  // <- Wait for result from forkJoinPool
System.out.println(primes);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}

这段代码与Java 8u352和Java 17.0.1上forkJoinPool中的所有操作保持一致。