Callable提供了带返回值的子线程执行结果,Future提供了获取子线程结果的途径
1 2 3 4
| Callable<String> callable = () -> null; ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> future = executor.submit(callable); System.out.println(future.get());
|
但是,这种直接get()
的方式是同步阻塞的,当然,如果轮询isDone()
的话仍然是换汤不换药。关于老生常谈的同步、异步、阻塞、非阻塞,这篇《I/O模型》从Java的视角出发来讲解,特别是NIO
和AIO
,指出AIO
并不是字面上的异步含义,值得一看。
那么对于Future
模式,除了上文的将来式get()
这种不优雅的同步阻塞方案,还有没有其他的方式可以拿到子线程结果呢?
很容易想到的一种方式是使用回调。如AIO
提供了java.nio.channels.CompletionHandler
作为回调接口,当I/O操作结束后,系统将会调用CompletionHandler
的completed
或failed
方法来结束一次调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup .withThreadPool(executor); AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel .open(channelGroup) .bind(new InetSocketAddress(serverPort)); serverChannel.accept( null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(AsynchronousSocketChannel result, Object attach) { serverChannel.accept(null, this); }
@Override public void failed(Throwable exc, Object attach) { } });
AsynchronousSocketChannel clientChannel = AsynchronousSocketChannel.open(); clientChannel.connect(new InetSocketAddress(clientPort)); clientChannel.read( ByteBuffer.allocate(1024), null, new CompletionHandler<Integer, Object>() { @Override public void completed(Integer result, Object attachment) { }
@Override public void failed(Throwable exc, Object attachment) { } });
|
JDK5
的NIO
已经提供了相关的API,虽然操作更为复杂一些,但在此基础上,诸如Netty
等通信框架已经发展的十分繁荣。AIO
似乎并没有达到预计的效果,但这种回调方式显然要比直接get()
的粗暴方式要更为优雅。
那么有没有不那么粗暴又方便一些的回调方案呢?
答案是有的,一些开源的工具已经为我们提供了这个功能,例如接下来要介绍的Google扩展包Guava
中提供的并发工具com.google.common.util.concurrent.ListenableFuture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public static void main(String... args) { ListeningExecutorService executorService = MoreExecutors .listeningDecorator(Executors.newCachedThreadPool());
final ListenableFuture<Integer> listenableFuture = executorService.submit(() -> { System.out.println("call execute.."); TimeUnit.SECONDS.sleep(1); return 7; });
Futures.addCallback(listenableFuture, new FutureCallback() { public void onSuccess(Object o) { System.out.println("异步处理成功,result="+o); }
public void onFailure(Throwable throwable) { System.out.println("异步处理失败,e="+throwable); } }, MoreExecutors.directExecutor());
System.out.println("不会阻塞");
}
|
和Future
的get()
会阻塞主线程不同,带监听器的ListenableFuture
可以异步处理Callable
结果,最终打印结果:
1 2 3
| call execute.. 不会阻塞 异步处理成功,result=7
|
从ListeningExecutorService
这个修饰后的线程池出发,看看如何修饰后如何将提交的Callable
输出为ListenableFuture
,而非Future
,主要来看submit
方法。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public abstract class AbstractListeningExecutorService extends AbstractExecutorService implements ListeningExecutorService { @Override public <T> ListenableFuture<T> submit(Callable<T> task) { return (ListenableFuture<T>) super.submit(task); } @Override protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return TrustedListenableFutureTask.create(callable); } }
public abstract class AbstractExecutorService implements ExecutorService { public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } }
|
这里的submit
和newTaskFor
最终执行的都是子类AbstractListeningExecutorService
中的,这是Guava
包内的,而非J.C.U
包内的AbstractExecutorService
,返回了TrustedListenableFutureTask
的实例,看一下依赖关系,能很清楚地看到这是ListenableFuture
的一个实现类。

那么是如何进行回调的呢?接着从刚才的实现类TrustedListenableFutureTask
来看,主要做的工作是InterruptibleTask
里,实现了Runnable
的run
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| class TrustedListenableFutureTask<V> extends FluentFuture.TrustedFuture<V> implements RunnableFuture<V> { private volatile InterruptibleTask<?> task;
TrustedListenableFutureTask(Callable<V> callable) { this.task = new TrustedFutureInterruptibleTask(callable); } }
abstract class InterruptibleTask<T> extends AtomicReference<Runnable> implements Runnable { private static final Runnable DONE = new DoNothingRunnable(); private static final Runnable INTERRUPTING = new DoNothingRunnable(); private static final Runnable PARKED = new DoNothingRunnable(); private static final int MAX_BUSY_WAIT_SPINS = 1000; @Override public final void run() { Thread currentThread = Thread.currentThread(); if (!compareAndSet(null, currentThread)) { return; } boolean run = !isDone(); T result = null; Throwable error = null; try { if (run) { result = runInterruptibly(); } } catch (Throwable t) { error = t; } finally { if (!compareAndSet(currentThread, DONE)) { boolean restoreInterruptedBit = false; int spinCount = 0; Runnable state = get(); while (state == INTERRUPTING || state == PARKED) { spinCount++; if (spinCount > MAX_BUSY_WAIT_SPINS) { if (state == PARKED || compareAndSet(INTERRUPTING, PARKED)) { restoreInterruptedBit = Thread.interrupted()||restoreInterruptedBit; LockSupport.park(this); } } else { Thread.yield(); } state = get(); } if (restoreInterruptedBit) { currentThread.interrupt(); } } if (run) { afterRanInterruptibly(result, error); } } } abstract boolean isDone();
abstract T runInterruptibly() throws Exception;
abstract void afterRanInterruptibly(@Nullable T result, @Nullable Throwable error); }
|