Java 执行器: 当任务完成时,如何在不阻塞的情况下得到通知?

假设我有一个满是任务的队列,我需要将它们提交给执行者服务。一次处理一个。我能想到的最简单的方法是:

  1. 从队列中取出任务
  2. 提交给遗嘱执行人
  3. 访问返回的 Future 并阻塞,直到有结果为止
  4. 从队列中取出另一个任务..。

然而,我尽量避免完全阻塞。如果我有10,000个这样的队列,它们需要一次处理一个任务,我将用完堆栈空间,因为它们中的大多数将持有被阻塞的线程。

我想要的是提交一个任务,并提供一个回调,当任务完成时调用。我将使用该回调通知作为发送下一个任务的标志。(function tionaljava 和 jetlang 显然使用这样的非阻塞算法,但我不能理解他们的代码)

如果没有编写自己的执行器服务,那么如何使用 JDK 的 java.util.current 来实现这一点呢?

(为我提供这些任务的队列本身可能会阻塞,但这是一个待以后解决的问题)

158163 次浏览

使用 强 > CountDownLatch

它来自 java.util.concurrent,并且正是在继续之前等待多个线程完成执行的方式。

为了达到您正在关注的回调效果,这确实需要一点额外的工作。也就是说,自己在一个单独的线程中处理这个问题,这个线程使用 CountDownLatch并等待它,然后继续通知您需要通知的任何内容。没有对回调的本机支持,或者类似的效果。


编辑: 既然我进一步理解了你的问题,我认为你做得太过了,没有必要。如果使用常规的 强 > SingleThreadExecutor,给它所有的任务,它将执行本机排队。

定义一个回调接口来接收您想在完成通知中传递的任何参数。然后在任务结束时调用它。

您甚至可以为 Runnable 任务编写一个通用包装器,并将它们提交给 ExecutorService。或者,参见下面的 Java8内置机制。

class CallbackTask implements Runnable {


private final Runnable task;


private final Callback callback;


CallbackTask(Runnable task, Callback callback) {
this.task = task;
this.callback = callback;
}


public void run() {
task.run();
callback.complete();
}


}

使用 CompletableFuture,Java8包含了一种更为复杂的方法来组合管道,在这些管道中,可以异步地、有条件地完成进程。下面是一个人为设计但完整的通知例子。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;


public class GetTaskNotificationWithoutBlocking {


public static void main(String... argv) throws Exception {
ExampleService svc = new ExampleService();
GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
f.thenAccept(listener::notify);
System.out.println("Exiting main()");
}


void notify(String msg) {
System.out.println("Received message: " + msg);
}


}


class ExampleService {


String work() {
sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
char[] str = new char[5];
ThreadLocalRandom current = ThreadLocalRandom.current();
for (int idx = 0; idx < str.length; ++idx)
str[idx] = (char) ('A' + current.nextInt(26));
String msg = new String(str);
System.out.println("Generated message: " + msg);
return msg;
}


public static void sleep(long average, TimeUnit unit) {
String name = Thread.currentThread().getName();
long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
try {
unit.sleep(timeout);
System.out.println(name + " awoke.");
} catch (InterruptedException abort) {
Thread.currentThread().interrupt();
System.out.println(name + " interrupted.");
}
}


public static long exponential(long avg) {
return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
}


}

如果您想确保没有任务将同时运行,那么使用 单线程执行器。任务将按照提交的顺序进行处理。您甚至不需要保存任务,只需将它们提交给执行。

ThreadPoolExecutor还有可以重写和使用的 beforeExecuteafterExecute钩子方法。以下是来自 ThreadPoolExecutorJavadocs的描述。

钩法

此类提供在每个任务执行之前和之后调用的受保护的可重写 beforeExecute(java.lang.Thread, java.lang.Runnable)afterExecute(java.lang.Runnable, java.lang.Throwable)方法。这些可用于操作执行环境; 例如,重新初始化 ThreadLocals、收集统计信息或添加日志条目。此外,可以重写方法 terminated()以执行在 Executor完全终止后需要执行的任何特殊处理。如果钩子或回调方法抛出异常,则内部工作线程可能反过来失败并突然终止。

您可以扩展 FutureTask类,并重写 done()方法,然后将 FutureTask对象添加到 ExecutorService,这样当 FutureTask立即完成时,done()方法将会调用。

使用 番石榴未来的可听 API并添加一个回调。参考网站:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
});

在 Java8中可以使用 完整的未来。下面是我的代码中的一个例子,我使用它从我的用户服务中获取用户,将他们映射到我的视图对象,然后更新我的视图或显示一个错误对话框(这是一个 GUI 应用程序) :

    CompletableFuture.supplyAsync(
userService::listUsers
).thenApply(
this::mapUsersToUserViews
).thenAccept(
this::updateView
).exceptionally(
throwable -> { showErrorDialogFor(throwable); return null; }
);

它异步执行。我使用两个私有方法: mapUsersToUserViewsupdateView

为了补充 Matt 的答案,这里有一个更加充实的示例来展示如何使用回调。

private static Primes primes = new Primes();


public static void main(String[] args) throws InterruptedException {
getPrimeAsync((p) ->
System.out.println("onPrimeListener; p=" + p));


System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
CompletableFuture.supplyAsync(primes::getNextPrime)
.thenApply((prime) -> {
System.out.println("getPrimeAsync(); prime=" + prime);
if (listener != null) {
listener.onPrime(prime);
}
return prime;
});
}

输出结果是:

    getPrimeAsync(); prime=241
onPrimeListener; p=241
Adios mi amigito

您可以使用 Callable 的实现,以便

public class MyAsyncCallable<V> implements Callable<V> {


CallbackInterface ci;


public MyAsyncCallable(CallbackInterface ci) {
this.ci = ci;
}


public V call() throws Exception {


System.out.println("Call of MyCallable invoked");
System.out.println("Result = " + this.ci.doSomething(10, 20));
return (V) "Good job";
}
}

CallbackInterface 是什么样的

public interface CallbackInterface {
public int doSomething(int a, int b);
}

现在主要班级看起来是这样的

ExecutorService ex = Executors.newFixedThreadPool(2);


MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);

这是使用番石榴的 ListenableFuture对 Pache 的答案进行的扩展。

特别是,Futures.transform()返回 ListenableFuture,因此可以用来链接异步调用。Futures.addCallback()返回 void,因此不能用于链接,但是有利于处理异步完成的成功/失败。

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());


// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
Futures.transform(database, database -> database.query(table, ...));


// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
Futures.transform(cursor, cursor -> cursorToFooList(cursor));


// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
public void onSuccess(List<Foo> foos) {
doSomethingWith(foos);
}
public void onFailure(Throwable thrown) {
log.error(thrown);
}
});

注意: 除了链接异步任务之外,Futures.transform()还允许您在单独的执行器上调度每个任务(本例中没有显示)。

使用 ExecutorService实现 Callback机制的简单代码

import java.util.concurrent.*;
import java.util.*;


public class CallBackDemo{
public CallBackDemo(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(5);


try{
for ( int i=0; i<5; i++){
Callback callback = new Callback(i+1);
MyCallable myCallable = new MyCallable((long)i+1,callback);
Future<Long> future = service.submit(myCallable);
//System.out.println("future status:"+future.get()+":"+future.isDone());
}
}catch(Exception err){
err.printStackTrace();
}
service.shutdown();
}
public static void main(String args[]){
CallBackDemo demo = new CallBackDemo();
}
}
class MyCallable implements Callable<Long>{
Long id = 0L;
Callback callback;
public MyCallable(Long val,Callback obj){
this.id = val;
this.callback = obj;
}
public Long call(){
//Add your business logic
System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
callback.callbackMethod();
return id;
}
}
class Callback {
private int i;
public Callback(int i){
this.i = i;
}
public void callbackMethod(){
System.out.println("Call back:"+i);
// Add your business logic
}
}

产出:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

主要注意事项:

  1. 如果您希望按先进先出顺序执行进程任务,请将 newFixedThreadPool(5)替换为 newFixedThreadPool(1)
  2. 如果你想在分析了上一个任务的 callback结果之后再处理下一个任务,只需要在下一行取消注释即可

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. You can replace newFixedThreadPool() with one of

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    取决于您的用例。

  4. 如果要异步处理回调方法

    传递一个共享的 abc 0到 Callable 任务

    Callable方法转换为 Callable/Runnable任务

    将回调任务推送到 ExecutorService or ThreadPoolExecutor

您可以在 Java 执行器中实现未来的任务,该执行器在任务完成时返回回调。

  • 对任务使用可调用而不是可运行
  • 可以在异步执行线程时执行非相关任务
  • 可以执行多个任务,并且可以得到每个任务的结果

下面是从任务返回随机整数值并打印在主线程上的类

public class ExecutorCallable {


public static void main(String[] args) throws ExecutionException, InterruptedException {


//fix number of threads
//blocking queue-thread safe
ExecutorService service = Executors.newFixedThreadPool(100);


//submit the tasks for execution
Future<Integer> result = service.submit(new Task());


//*** perform some unrelated operations


System.out.println("Result of submitted task is : " + result.get());//blocks until the future is ready after completion


System.out.println("Thread name " + Thread.currentThread().getName());
}


static class Task implements Callable<Integer> {


@Override
public Integer call() throws Exception {
return new Random().nextInt();
}
}
}

产出为:

Result of submitted task is : 16645418
Thread name main