Java 的“并行. 用于”?

我想知道是否有一个相当于 Java 的.net 版本的 平行

如果有人可以提供一个例子吗? 谢谢!

44279 次浏览

Java 7 中的 Fork 连接框架是用于并发支持的,但是我不知道 Parallel.For是否有一个精确的等价物。

我想最接近的事情就是:

ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS);
try {
for (final Object o : list) {
exec.submit(new Runnable() {
@Override
public void run() {
// do stuff with o.
}
});
}
} finally {
exec.shutdown();
}

Based on TheLQ's comments, you would set SUM_NUM_THREADS to Runtime.getRuntime().availableProcessors();

编辑: 决定添加一个基本的“并行。为”实现

public class Parallel {
private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();


private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For"));


public static <T> void For(final Iterable<T> elements, final Operation<T> operation) {
try {
// invokeAll blocks for us until all submitted tasks in the call complete
forPool.invokeAll(createCallables(elements, operation));
} catch (InterruptedException e) {
e.printStackTrace();
}
}


public static <T> Collection<Callable<Void>> createCallables(final Iterable<T> elements, final Operation<T> operation) {
List<Callable<Void>> callables = new LinkedList<Callable<Void>>();
for (final T elem : elements) {
callables.add(new Callable<Void>() {
@Override
public Void call() {
operation.perform(elem);
return null;
}
});
}


return callables;
}


public static interface Operation<T> {
public void perform(T pParameter);
}
}

并行的用法示例

// Collection of items to process in parallel
Collection<Integer> elems = new LinkedList<Integer>();
for (int i = 0; i < 40; ++i) {
elems.add(i);
}
Parallel.For(elems,
// The operation to perform with each item
new Parallel.Operation<Integer>() {
public void perform(Integer param) {
System.out.println(param);
};
});

我想这个实现实际上更类似于 平行

剪辑 如果有人感兴趣的话,我把这个放在 GitHub 上。 < a href = “ https://GitHub.com/tantaman/commons/master/src/main/java/com/tantaman/commons/current/Parallel.java”rel = “ noReferrer”> 在 GitHub 上的并行

一个更简单的选择是

// A thread pool which runs for the life of the application.
private static final ExecutorService EXEC =
Executors.newFixedThreadPool(SOME_NUM_OF_THREADS);


//later
EXEC.invokeAll(tasks); // you can optionally specify a timeout.

MLaw 的解决方案是一个非常实用的并行。

public class Parallel
{
static final int iCPU = Runtime.getRuntime().availableProcessors();


public static <T> void ForEach(Iterable <T> parameters,
final LoopBody<T> loopBody)
{
ExecutorService executor = Executors.newFixedThreadPool(iCPU);
List<Future<?>> futures  = new LinkedList<Future<?>>();


for (final T param : parameters)
{
Future<?> future = executor.submit(new Runnable()
{
public void run() { loopBody.run(param); }
});


futures.add(future);
}


for (Future<?> f : futures)
{
try   { f.get(); }
catch (InterruptedException e) { }
catch (ExecutionException   e) { }
}


executor.shutdown();
}


public static void For(int start,
int stop,
final LoopBody<Integer> loopBody)
{
ExecutorService executor = Executors.newFixedThreadPool(iCPU);
List<Future<?>> futures  = new LinkedList<Future<?>>();


for (int i=start; i<stop; i++)
{
final Integer k = i;
Future<?> future = executor.submit(new Runnable()
{
public void run() { loopBody.run(k); }
});
futures.add(future);
}


for (Future<?> f : futures)
{
try   { f.get(); }
catch (InterruptedException e) { }
catch (ExecutionException   e) { }
}


executor.shutdown();
}
}


public interface LoopBody <T>
{
void run(T i);
}


public class ParallelTest
{
int k;


public ParallelTest()
{
k = 0;
Parallel.For(0, 10, new LoopBody <Integer>()
{
public void run(Integer i)
{
k += i;
System.out.println(i);
}
});
System.out.println("Sum = "+ k);
}


public static void main(String [] argv)
{
ParallelTest test = new ParallelTest();
}
}

有一个平行的等价物。作为 java 扩展提供。它被称为 Ateji PX,他们有一个免费的版本,你可以玩。http://www.ateji.com/px/index.html

它完全等效于盲文的并且看起来类似于。

For ||

更多维基百科的例子和解释: Http://en.wikipedia.org/wiki/ateji_px

Java 中的封闭事物

基于错误建议,添加 CountDownLatch。 添加块大小以减少提交()。

当使用400万个项目数组进行测试时,这个 为() on 提供5倍于顺序的速度 我的酷睿 i72630QM CPU。

public class Loop {
public interface Each {
void run(int i);
}


private static final int CPUs = Runtime.getRuntime().availableProcessors();


public static void withIndex(int start, int stop, final Each body) {
int chunksize = (stop - start + CPUs - 1) / CPUs;
int loops = (stop - start + chunksize - 1) / chunksize;
ExecutorService executor = Executors.newFixedThreadPool(CPUs);
final CountDownLatch latch = new CountDownLatch(loops);
for (int i=start; i<stop;) {
final int lo = i;
i += chunksize;
final int hi = (i<stop) ? i : stop;
executor.submit(new Runnable() {
public void run() {
for (int i=lo; i<hi; i++)
body.run(i);
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {}
executor.shutdown();
}


public static void main(String [] argv) {
Loop.withIndex(0, 9, new Loop.Each() {
public void run(int i) {
System.out.println(i*10);
}
});
}
}

我有一个更新的 Java 并行类可以做并行。为了,平行。每一个,平行。任务和分区的并行循环。源代码如下:

使用这些并行循环的例子如下:

public static void main(String [] argv)
{
//sample data
final ArrayList<String> ss = new ArrayList<String>();


String [] s = {"a", "b", "c", "d", "e", "f", "g"};
for (String z : s) ss.add(z);
int m = ss.size();


//parallel-for loop
System.out.println("Parallel.For loop:");
Parallel.For(0, m, new LoopBody<Integer>()
{
public void run(Integer i)
{
System.out.println(i +"\t"+ ss.get(i));
}
});


//parallel for-each loop
System.out.println("Parallel.ForEach loop:");
Parallel.ForEach(ss, new LoopBody<String>()
{
public void run(String p)
{
System.out.println(p);
}
});


//partitioned parallel loop
System.out.println("Partitioned Parallel loop:");
Parallel.ForEach(Parallel.create(0, m), new LoopBody<Partition>()
{
public void run(Partition p)
{
for(int i=p.start; i<p.end; i++)
System.out.println(i +"\t"+ ss.get(i));
}
});


//parallel tasks
System.out.println("Parallel Tasks:");
Parallel.Tasks(new Task []
{
//task-1
new Task() {public void run()
{
for(int i=0; i<3; i++)
System.out.println(i +"\t"+ ss.get(i));
}},


//task-2
new Task() {public void run()
{
for (int i=3; i<6; i++)
System.out.println(i +"\t"+ ss.get(i));
}}
});
}

下面是我对 https://github.com/pablormier/parallel-loops这个主题的贡献。用法很简单:

Collection<String> upperCaseWords =
Parallel.ForEach(words, new Parallel.F<String, String>() {
public String apply(String s) {
return s.toUpperCase();
}
});

还可以改变一些行为方面,比如线程的数量(默认情况下,它使用缓存的线程池) :

Collection<String> upperCaseWords =
new Parallel.ForEach<String, String>(words)
.withFixedThreads(4)
.apply(new Parallel.F<String, String>() {
public String apply(String s) {
return s.toUpperCase();
}
}).values();

所有代码都是 在一个 java 类中自包含,并且没有比 JDK 更多的依赖项。我还鼓励您使用 Java8以函数式的方式检查 并行化的新方法

同步通常会终止并行 for 循环的加速。因此,并行 for 循环通常需要它们的私有数据和一种减少机制来减少所有线程私有数据以构成单个结果。

因此,我已经扩展了并行。对于 Weimin Xiao的版本,通过一个还原机制。

public class Parallel {
public static interface IntLoopBody {
void run(int i);
}


public static interface LoopBody<T> {
void run(T i);
}


public static interface RedDataCreator<T> {
T run();
}


public static interface RedLoopBody<T> {
void run(int i, T data);
}


public static interface Reducer<T> {
void run(T returnData, T addData);
}


private static class ReductionData<T> {
Future<?> future;
T data;
}


static final int nCPU = Runtime.getRuntime().availableProcessors();


public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) {
ExecutorService executor = Executors.newFixedThreadPool(nCPU);
List<Future<?>> futures  = new LinkedList<Future<?>>();


for (final T param : parameters) {
futures.add(executor.submit(() -> loopBody.run(param) ));
}


for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println(e);
}
}
executor.shutdown();
}


public static void For(int start, int stop, final IntLoopBody loopBody) {
final int chunkSize = (stop - start + nCPU - 1)/nCPU;
final int loops = (stop - start + chunkSize - 1)/chunkSize;
ExecutorService executor = Executors.newFixedThreadPool(loops);
List<Future<?>> futures  = new LinkedList<Future<?>>();


for (int i=start; i < stop; ) {
final int iStart = i;
i += chunkSize;
final int iStop = (i < stop) ? i : stop;


futures.add(executor.submit(() -> {
for (int j = iStart; j < iStop; j++)
loopBody.run(j);
}));
}


for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println(e);
}
}
executor.shutdown();
}


public static <T> void For(int start, int stop, T result, final RedDataCreator<T> creator, final RedLoopBody<T> loopBody, final Reducer<T> reducer) {
final int chunkSize = (stop - start + nCPU - 1)/nCPU;
final int loops = (stop - start + chunkSize - 1)/chunkSize;
ExecutorService executor = Executors.newFixedThreadPool(loops);
List<ReductionData<T>> redData  = new LinkedList<ReductionData<T>>();


for (int i = start; i < stop; ) {
final int iStart = i;
i += chunkSize;
final int iStop = (i < stop) ? i : stop;
final ReductionData<T> rd = new ReductionData<T>();


rd.data = creator.run();
rd.future = executor.submit(() -> {
for (int j = iStart; j < iStop; j++) {
loopBody.run(j, rd.data);
}
});
redData.add(rd);
}


for (ReductionData<T> rd : redData) {
try {
rd.future.get();
if (rd.data != null) {
reducer.run(result, rd.data);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
}
}

下面是一个简单的测试示例: 使用非同步映射的并行字符计数器。

import java.util.*;


public class ParallelTest {
static class Counter {
int cnt;


Counter() {
cnt = 1;
}
}


public static void main(String[] args) {
String text = "More formally, if this map contains a mapping from a key k to a " +
"value v such that key compares equal to k according to the map's ordering, then " +
"this method returns v; otherwise it returns null.";
Map<Character, Counter> charCounter1 = new TreeMap<Character, Counter>();
Map<Character, Counter> charCounter2 = new TreeMap<Character, Counter>();


// first sequentially
for(int i=0; i < text.length(); i++) {
char c = text.charAt(i);
Counter cnt = charCounter1.get(c);
if (cnt == null) {
charCounter1.put(c, new Counter());
} else {
cnt.cnt++;
}
}
for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue().cnt);
}


// now parallel without synchronization
Parallel.For(0, text.length(), charCounter2,
// Creator
() -> new TreeMap<Character, Counter>(),
// Loop Body
(i, map) -> {
char c = text.charAt(i);
Counter cnt = map.get(c);
if (cnt == null) {
map.put(c, new Counter());
} else {
cnt.cnt++;
}
},
// Reducer
(result, map) -> {
for(Map.Entry<Character, Counter> entry: map.entrySet()) {
Counter cntR = result.get(entry.getKey());
if (cntR == null) {
result.put(entry.getKey(), entry.getValue());
} else {
cntR.cnt += entry.getValue().cnt;
}
}
}
);


// compare results
assert charCounter1.size() == charCounter2.size() : "wrong size: " + charCounter1.size() + ", " + charCounter2.size();
Iterator<Map.Entry<Character, Counter>> it2 = charCounter2.entrySet().iterator();
for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
Map.Entry<Character, Counter> entry2 = it2.next();
assert entry.getKey() == entry2.getKey() && entry.getValue().cnt == entry2.getValue().cnt : "wrong content";
}


System.out.println("Well done!");
}
}

这就是我在 Java7中使用的代码。

对于 Java8,您可以使用 为每个()

[更新]

平行班:

private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();
private static final int MAX_THREAD = NUM_CORES*2;


public static <T2 extends T, T> void For(final Iterable<T2> elements, final Operation<T> operation) {
if (elements != null) {
final Iterator<T2> iterator = elements.iterator();
if (iterator.hasNext()) {
final Throwable[] throwable = new Throwable[1];
final Callable<Void> callable = new Callable<Void>() {
boolean first = true;
@Override
public final Void call() throws Exception {
if ((first || operation.follow()) && iterator.hasNext()) {
T result;
result = iterator.next();
operation.perform(result);
if (first) {
synchronized (this) {
first = false;
}
}
}
return null;
}
};
final Runnable runnable = new Runnable() {
@Override
public final void run() {
while (iterator.hasNext()) {
try {
synchronized (callable) {
callable.call();
}
if (!operation.follow()) {
break;
}
} catch (Throwable t) {
t.printStackTrace();
synchronized (throwable) {
throwable[0] = t;
}
throw new RuntimeException(t);
}
}
}
};
final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD);
for (int threadIndex=0; threadIndex<MAX_THREAD && iterator.hasNext(); threadIndex++) {
executor.execute(runnable);
}
executor.shutdown();
while (!executor.isTerminated()) {
try {
Thread.sleep(0,1);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
if (throwable[0] != null) throw new RuntimeException(throwable[0]);
}
}
}


public interface Operation<T> {
void perform(T pParameter);
boolean follow();
}

例子

@Test
public void test() {
List<Long> longList = new ArrayList<Long>();
for (long i = 0; i < 1000000; i++) {
longList.add(i);
}
final List<Integer> integerList = new LinkedList<>();
Parallel.For((Iterable<? extends Number>) longList, new Parallel.Operation<Number>() {


@Override
public void perform(Number pParameter) {
System.out.println(pParameter);
integerList.add(pParameter.intValue());
}


@Override
public boolean follow() {
return true;
}
});
for (Number num : integerList) {
System.out.println(num);
}
}

Monitoring

< a href = “/questions/tags/multithreading”class = “ post-tag”title = “ show questions tag & # 39;”rel = “ tag”> multithreading

在我的案例中,我发现 ForkJoinPool 和 IntStream 非常有帮助(使用有限数量的线程进行并行处理)。

C # :

static void MathParallel(int threads)
{
Parallel.For(1, partitions, new ParallelOptions { MaxDegreeOfParallelism = threads }, (i) => {
partitionScores[i] = Math.Sin(3*i);
});
}

与 Java 等效:

static void mathParallel(int threads) {
ForkJoinPool pool = new ForkJoinPool(threads);
pool.submit(()-> IntStream.range(0, partitions).parallel().forEach(i -> {
partitionScores[i] = Math.sin(3*i);
}));
pool.shutdown();
while (!pool.isTerminated()){
}
}