AbstractQueuedSynchronizer的介绍
Node { int waitStatus; Node prev; Node next; Node nextWaiter; Thread thread;}
?waitStatus其中包含的状态有:
1. CANCELLED,值为1,表示当前的线程被取消;
2. SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
3. CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
4. PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
? ? ?值为0,表示当前节点在sync队列中,等待着获取锁
?
AQS中冗余了Head节点。也就是Head是第一个节点。但是Tail只是其中的一个指针引用。
?
AQS执行流程:
?
以下是排它锁获取的主要流程:
? ?tryAcquire:尝试是否需要使用排它锁(一般都是业务决定的,根据state这个成员变量的值,比如ReentrantLock中公平锁或者非公平锁的使用)
??acquireQueued:等待获取排它锁的使用权。在这一步中,addWaiter(Node.EXCLUSIVE)是将此线程操作放到排它锁的队列中等待,期望获取使用权。addWaiter是将此线程压入等待队列中
?
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
? acquireQueued中,因为Head是冗余节点,所以每个Node肯定都有前置节点(除了head),所以如果前置节点是head,那么就需要再次调用tryAcquire,是否需要使用排它锁,如果不需要的话,直接将此节点当做Head节点。否则就需要判断此节点是否挂起了。其中shouldParkAfterFailedAcquire方法主要修改节点状态的,标识为需要唤醒状态。parkAndCheckInterrupt就是调用park方法并且返回此线程是否中断。
?
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } }? /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }?/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }?加粗部分为共享锁和排它锁的区别。共享锁允许获取多个线程获取,而排它锁只允许一个。/** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } }?共享锁的释放过程:/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }??AQS只是提供了共享锁和排它锁的执行流程,具体的业务逻辑控制需要自己实现。在AQS的基础上,提供了很多有用的锁机制。CountDownLatch:开闭锁,一般当做阀值开关。ReentrantLock:可重入锁。和synchroized关键字差不多,在排它锁的基础上做业务,已经持有锁的线程,可以再次获取锁,锁的计数器加1,release的时候减一。?其他:Condition介绍
?