Java8: 并行 FOR 循环

我听说 Java 8提供了很多关于并发计算的实用程序。因此,我想知道什么是并行给定 for 循环的最简单的方法?

public static void main(String[] args)
{
Set<Server> servers = getServers();
Map<String, String> serverData = new ConcurrentHashMap<>();


for (Server server : servers)
{
String serverId = server.getIdentifier();
String data = server.fetchData();


serverData.put(serverId, data);
}
}
95673 次浏览

That would be using a Stream:

servers.parallelStream().forEach(server -> {
serverData.put(server.getIdentifier(), server.fetchData());
});

I suspect a Collector can be used to greater effect here, since you use a concurrent collection.

Read up on streams, they're all the new rage.

Pay especially close attention to the bit about parallelism:

"Processing elements with an explicit for-loop is inherently serial. Streams facilitate parallel execution by reframing the computation as a pipeline of aggregate operations, rather than as imperative operations on each individual element. All streams operations can execute either in serial or in parallel."

So to recap, there are no parallel for-loops, they're inherently serial. Streams however can do the job. Take a look at the following code:

    Set<Server> servers = getServers();
Map<String, String> serverData = new ConcurrentHashMap<>();


servers.parallelStream().forEach((server) -> {
serverData.put(server.getIdentifier(), server.fetchData());
});

More elegant or functional solution will be just using Collectors toMap or toConcurrentMap function, which avoid maintaining another stateful variable for ConcurrentHashMap, as following example:

final Set<Server> servers = getServers();
Map<String, String> serverData = servers.parallelStream().collect(
toConcurrentMap(Server::getIdentifier, Server::fetchData));

Note: 1. Those functional interfaces (Server::getIdentifier or Server::fetchData) doesn't allow throw checked exception here, 2. To get the full benefits of parallel stream, the number of servers would be large and there is no I/O involved, purely data processing in those functions(getIdentifier, fetchData)

Please refer to Collectors javadoc at http://docs.oracle.com/javase/8/docs/api/java/util/stream/Collectors.html#toConcurrentMap

Simple example to copy'n'paste (the examples above use the class Server which is a custom class written by the OP):

import java.io.Console;
import java.util.ArrayList;


ArrayList<String> list = new ArrayList<>();
list.add("Item1");
list.add("Item2");
list.parallelStream().forEach((o) -> {
System.out.print(o);
});

Console output. The order could possibly vary as everything executes in parallel:

Item1
Item2

The .parallelStream() method was introduced in Java v8. This example was tested with JDK v1.8.0_181.