FutureTask介绍
java.util.concurrent.FutureTask<V>
?
介绍:
FutureTask实现了RunnableFuture接口,具备了Runnable和Future的所有特性。? ? Future提供了一种获得异步线程运行结果的机制。可以通过调用get方法得到结果,如果线程还未结束,会阻塞当前线程直至结果返回或中断发生。Future一般和Callable以及ExecutorService配合使用。
?
?
?
?
?
?
?
?
?
?
?
?
?
?
FutureTask中也是用AQS处理同步机制。实现原理非常简单就是用了一个共享锁,然后在其内部做了Task的一些状态管理。?
?
/* * 这个类实现了同步机制 定义了Task的状态。 */private final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -7828117401763700385L; /** State value representing that task is running */ private static final int RUNNING = 1; /** State value representing that task ran */ private static final int RAN = 2; /** State value representing that task was cancelled */ private static final int CANCELLED = 4; /** The underlying callable */ private final Callable<V> callable; /** The result to return from get() */ private V result; /** The exception to throw from get() */ private Throwable exception; /** * The thread running task. When nulled after set/cancel, this * indicates that the results are accessible. Must be * volatile, to ensure visibility upon completion. */ private volatile Thread runner; Sync(Callable<V> callable) { this.callable = callable; } private boolean ranOrCancelled(int state) { return (state & (RAN | CANCELLED)) != 0; } /** * Implements AQS base acquire to succeed if ran or cancelled */ protected int tryAcquireShared(int ignore) { return innerIsDone()? 1 : -1; } /** * Implements AQS base release to always signal after setting * final done status by nulling runner thread. */ protected boolean tryReleaseShared(int ignore) { runner = null; return true; } boolean innerIsCancelled() { return getState() == CANCELLED; } boolean innerIsDone() { return ranOrCancelled(getState()) && runner == null; } /*** * Future的get方法调用此方法,当Result没有返回的时候,共享锁不会释放,所以,线程会被阻塞掉。 */ V innerGet() throws InterruptedException, ExecutionException { acquireSharedInterruptibly(0); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; } V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; } //设置Result。 V是由Callable返回的结果。 void innerSet(V v) { for (;;) {int s = getState();if (s == RAN) return; if (s == CANCELLED) { // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; }if (compareAndSetState(s, RAN)) { result = v; releaseShared(0); done(); return; } } } //如果后台出现Exception时的处理 void innerSetException(Throwable t) { for (;;) {int s = getState();if (s == RAN) return; if (s == CANCELLED) { // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; }if (compareAndSetState(s, RAN)) { exception = t; result = null; releaseShared(0); done(); return; } } } //调用cancel后的处理逻辑 boolean innerCancel(boolean mayInterruptIfRunning) { for (;;) {int s = getState();if (ranOrCancelled(s)) return false;if (compareAndSetState(s, CANCELLED)) break; } if (mayInterruptIfRunning) { Thread r = runner; if (r != null) r.interrupt(); } releaseShared(0); done(); return true; } //Task的启动入口, 这里会将Task的状态置为Running状态,如果result返回改为RUN,抛出异常后状态改为RUN并且保存Exception现场。 void innerRun() { if (!compareAndSetState(0, RUNNING)) return; try { runner = Thread.currentThread(); if (getState() == RUNNING) // recheck after setting thread innerSet(callable.call()); else releaseShared(0); // cancel } catch (Throwable ex) { innerSetException(ex); } } //Task启动入口,和上面一个不同,此方法不需要保存返回结果。 boolean innerRunAndReset() { if (!compareAndSetState(0, RUNNING)) return false; try { runner = Thread.currentThread(); if (getState() == RUNNING) callable.call(); // don't set result runner = null; return compareAndSetState(RUNNING, 0); } catch (Throwable ex) { innerSetException(ex); return false; } } }?
?