首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

FutureTask引见

2013-12-09 
FutureTask介绍java.util.concurrent.FutureTaskV?介绍:FutureTask实现了RunnableFuture接口,具备了Runn

FutureTask介绍

java.util.concurrent.FutureTask<V>

?

介绍:

FutureTask实现了RunnableFuture接口,具备了Runnable和Future的所有特性。? ? Future提供了一种获得异步线程运行结果的机制。可以通过调用get方法得到结果,如果线程还未结束,会阻塞当前线程直至结果返回或中断发生。Future一般和Callable以及ExecutorService配合使用。

?

FutureTask引见

?

?

?

?

?

?

?

?

?

?

?

?

?

FutureTask中也是用AQS处理同步机制。实现原理非常简单就是用了一个共享锁,然后在其内部做了Task的一些状态管理。?

?

/* *  这个类实现了同步机制 定义了Task的状态。 */private final class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -7828117401763700385L;        /** State value representing that task is running */        private static final int RUNNING   = 1;        /** State value representing that task ran */        private static final int RAN       = 2;        /** State value representing that task was cancelled */        private static final int CANCELLED = 4;        /** The underlying callable */        private final Callable<V> callable;        /** The result to return from get() */        private V result;        /** The exception to throw from get() */        private Throwable exception;        /**         * The thread running task. When nulled after set/cancel, this         * indicates that the results are accessible.  Must be         * volatile, to ensure visibility upon completion.         */        private volatile Thread runner;        Sync(Callable<V> callable) {            this.callable = callable;        }        private boolean ranOrCancelled(int state) {            return (state & (RAN | CANCELLED)) != 0;        }        /**         * Implements AQS base acquire to succeed if ran or cancelled         */        protected int tryAcquireShared(int ignore) {            return innerIsDone()? 1 : -1;        }        /**         * Implements AQS base release to always signal after setting         * final done status by nulling runner thread.         */        protected boolean tryReleaseShared(int ignore) {            runner = null;            return true;        }        boolean innerIsCancelled() {            return getState() == CANCELLED;        }        boolean innerIsDone() {            return ranOrCancelled(getState()) && runner == null;        }        /***         *  Future的get方法调用此方法,当Result没有返回的时候,共享锁不会释放,所以,线程会被阻塞掉。         */        V innerGet() throws InterruptedException, ExecutionException {            acquireSharedInterruptibly(0);            if (getState() == CANCELLED)                throw new CancellationException();            if (exception != null)                throw new ExecutionException(exception);            return result;        }        V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {            if (!tryAcquireSharedNanos(0, nanosTimeout))                throw new TimeoutException();            if (getState() == CANCELLED)                throw new CancellationException();            if (exception != null)                throw new ExecutionException(exception);            return result;        }        //设置Result。 V是由Callable返回的结果。        void innerSet(V v) {    for (;;) {int s = getState();if (s == RAN)    return;                if (s == CANCELLED) {    // aggressively release to set runner to null,    // in case we are racing with a cancel request    // that will try to interrupt runner                    releaseShared(0);                    return;                }if (compareAndSetState(s, RAN)) {                    result = v;                    releaseShared(0);                    done();    return;                }            }        }        //如果后台出现Exception时的处理        void innerSetException(Throwable t) {    for (;;) {int s = getState();if (s == RAN)    return;                if (s == CANCELLED) {    // aggressively release to set runner to null,    // in case we are racing with a cancel request    // that will try to interrupt runner                    releaseShared(0);                    return;                }if (compareAndSetState(s, RAN)) {                    exception = t;                    result = null;                    releaseShared(0);                    done();    return;                }    }        }        //调用cancel后的处理逻辑        boolean innerCancel(boolean mayInterruptIfRunning) {    for (;;) {int s = getState();if (ranOrCancelled(s))    return false;if (compareAndSetState(s, CANCELLED))    break;    }            if (mayInterruptIfRunning) {                Thread r = runner;                if (r != null)                    r.interrupt();            }            releaseShared(0);            done();            return true;        }        //Task的启动入口, 这里会将Task的状态置为Running状态,如果result返回改为RUN,抛出异常后状态改为RUN并且保存Exception现场。        void innerRun() {            if (!compareAndSetState(0, RUNNING))                return;            try {                runner = Thread.currentThread();                if (getState() == RUNNING) // recheck after setting thread                    innerSet(callable.call());                else                    releaseShared(0); // cancel            } catch (Throwable ex) {                innerSetException(ex);            }        }         //Task启动入口,和上面一个不同,此方法不需要保存返回结果。        boolean innerRunAndReset() {            if (!compareAndSetState(0, RUNNING))                return false;            try {                runner = Thread.currentThread();                if (getState() == RUNNING)                    callable.call(); // don't set result                runner = null;                return compareAndSetState(RUNNING, 0);            } catch (Throwable ex) {                innerSetException(ex);                return false;            }        }    }
?

?

热点排行