首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

AbstractQueuedSynchronizer的引见

2013-12-13 
AbstractQueuedSynchronizer的介绍Node {int waitStatusNode prevNode nextNode nextWaiterThread thr

AbstractQueuedSynchronizer的介绍
Node { int waitStatus; Node prev; Node next; Node nextWaiter; Thread thread;}

?waitStatus其中包含的状态有:

1. CANCELLED,值为1,表示当前的线程被取消;

2. SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;

3. CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;

4. PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

? ? ?值为0,表示当前节点在sync队列中,等待着获取锁

?

AQS中冗余了Head节点。也就是Head是第一个节点。但是Tail只是其中的一个指针引用。

?

AQS执行流程:

?

以下是排它锁获取的主要流程:

? ?tryAcquire:尝试是否需要使用排它锁(一般都是业务决定的,根据state这个成员变量的值,比如ReentrantLock中公平锁或者非公平锁的使用)

??acquireQueued:等待获取排它锁的使用权。在这一步中,addWaiter(Node.EXCLUSIVE)是将此线程操作放到排它锁的队列中等待,期望获取使用权。addWaiter是将此线程压入等待队列中

?

 /**     * Acquires in exclusive mode, ignoring interrupts.  Implemented     * by invoking at least once {@link #tryAcquire},     * returning on success.  Otherwise the thread is queued, possibly     * repeatedly blocking and unblocking, invoking {@link     * #tryAcquire} until success.  This method can be used     * to implement method {@link Lock#lock}.     *     * @param arg the acquire argument.  This value is conveyed to     *        {@link #tryAcquire} but is otherwise uninterpreted and     *        can represent anything you like.     */    public final void acquire(int arg) {        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }

? acquireQueued中,因为Head是冗余节点,所以每个Node肯定都有前置节点(除了head),所以如果前置节点是head,那么就需要再次调用tryAcquire,是否需要使用排它锁,如果不需要的话,直接将此节点当做Head节点。否则就需要判断此节点是否挂起了。其中shouldParkAfterFailedAcquire方法主要修改节点状态的,标识为需要唤醒状态。parkAndCheckInterrupt就是调用park方法并且返回此线程是否中断。

?

 /**     * Acquires in exclusive uninterruptible mode for thread already in     * queue. Used by condition wait methods as well as acquire.     *     * @param node the node     * @param arg the acquire argument     * @return {@code true} if interrupted while waiting     */    final boolean acquireQueued(final Node node, int arg) {        try {            boolean interrupted = false;            for (;;) {                final Node p = node.predecessor();                if (p == head && tryAcquire(arg)) {                    setHead(node);                    p.next = null; // help GC                    return interrupted;                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } catch (RuntimeException ex) {            cancelAcquire(node);            throw ex;        }    }
? /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }?/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }?加粗部分为共享锁和排它锁的区别。共享锁允许获取多个线程获取,而排它锁只允许一个。/** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } }?共享锁的释放过程:/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }??AQS只是提供了共享锁和排它锁的执行流程,具体的业务逻辑控制需要自己实现。在AQS的基础上,提供了很多有用的锁机制。CountDownLatch:开闭锁,一般当做阀值开关。ReentrantLock:可重入锁。和synchroized关键字差不多,在排它锁的基础上做业务,已经持有锁的线程,可以再次获取锁,锁的计数器加1,release的时候减一。?其他:Condition介绍

?

热点排行