如何根据 CPU 核扩展线程?

我想用 Java 解决一个多线程的数学问题。我的数学问题可以分解成工作单元,我想要在几个线程中解决。

我不希望有固定数量的线程在上面工作,而是希望有与 CPU 核心数量相匹配的线程数量。我的问题是,我找不到一个简单的教程在互联网上这一点。我所找到的都是带有固定线程的示例。

如何做到这一点? 你能提供例子吗?

88102 次浏览

On the Runtime class, there is a method called availableProcessors(). You can use that to figure out how many CPUs you have. Since your program is CPU bound, you would probably want to have (at most) one thread per available CPU.

You can determine the number of processes available to the Java Virtual Machine by using the static Runtime method, availableProcessors. Once you have determined the number of processors available, create that number of threads and split up your work accordingly.

Update: To further clarify, a Thread is just an Object in Java, so you can create it just like you would create any other object. So, let's say that you call the above method and find that it returns 2 processors. Awesome. Now, you can create a loop that generates a new Thread, and splits the work off for that thread, and fires off the thread. Here's some pseudocode to demonstrate what I mean:

int processors = Runtime.getRuntime().availableProcessors();
for(int i=0; i < processors; i++) {
Thread yourThread = new AThreadYouCreated();
// You may need to pass in parameters depending on what work you are doing and how you setup your thread.
yourThread.start();
}

For more information on creating your own thread, head to this tutorial. Also, you may want to look at Thread Pooling for the creation of the threads.

You probably want to look at the java.util.concurrent framework for this stuff too. Something like:

ExecutorService e = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// Do work using something like either
e.execute(new Runnable() {
public void run() {
// do one task
}
});

or

    Future<String> future = pool.submit(new Callable<String>() {
public String call() throws Exception {
return null;
}
});
future.get();  // Will block till result available

This is a lot nicer than coping with your own thread pools etc.

Doug Lea (author of the concurrent package) has this paper which may be relevant: http://gee.cs.oswego.edu/dl/papers/fj.pdf

The Fork Join framework has been added to Java SE 7. Below are few more references:

http://www.ibm.com/developerworks/java/library/j-jtp11137/index.html Article by Brian Goetz

http://www.oracle.com/technetwork/articles/java/fork-join-422606.html

The standard way is the Runtime.getRuntime().availableProcessors() method. On most standard CPUs you will have returned the optimal thread count (which is not the actual CPU core count) here. Therefore this is what you are looking for.

Example:

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

Do NOT forget to shut down the executor service like this (or your program won't exit):

service.shutdown();

Here just a quick outline how to set up a future based MT code (offtopic, for illustration):

CompletionService<YourCallableImplementor> completionService =
new ExecutorCompletionService<YourCallableImplementor>(service);
ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();
for (String computeMe : elementsToCompute) {
futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}

Then you need to keep track on how many results you expect and retrieve them like this:

try {
int received = 0;
while (received < elementsToCompute.size()) {
Future<YourCallableImplementor> resultFuture = completionService.take();
YourCallableImplementor result = resultFuture.get();
received++;
}
} finally {
service.shutdown();
}

Option 1:

newWorkStealingPool from Executors

public static ExecutorService newWorkStealingPool()

Creates a work-stealing thread pool using all available processors as its target parallelism level.

With this API, you don't need to pass number of cores to ExecutorService.

Implementation of this API from grepcode

/**
* Creates a work-stealing thread pool using all
* {@link Runtime#availableProcessors available processors}
* as its target parallelism level.
* @return the newly created thread pool
* @see #newWorkStealingPool(int)
* @since 1.8
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

Option 2:

newFixedThreadPool API from Executors or other newXXX constructors, which returns ExecutorService

public static ExecutorService newFixedThreadPool(int nThreads)

replace nThreads with Runtime.getRuntime().availableProcessors()

Option 3:

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)

pass Runtime.getRuntime().availableProcessors() as parameter to maximumPoolSize.