简介
在之前已经有6篇关于AQS源码分析的文章了,关于源码分析的一些问题可以去看看我之前的文章,文章连接可以在文末查看。这一篇文章主要是对AQS的一些总结,或者说是面经。
AQS是什么
AQS 全称是AbstractQueuedSynchronizer,在java.util.concurrent.locks
包下面,是一个抽象的可以实现阻塞线程、排队控制、唤醒线程等操作的同步器基础框架类,AQS 可以实现排它锁、共享锁、条件锁、计数器等相关功能。
AQS 对资源的共享方式
AQS定义两种资源共享方式
// AQS.NODE
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
- Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
- 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
- Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。
ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。
两个队列
AQS 里面有两个队列,我称为同步队列和条件队列。条件队列主要是实现条件锁时用到的队列,同步队列就是维护唤醒线程的队列。
-
同步队列 主要用于维护获取互斥锁失败时入队的线程
```java private transient volatile Node head; private transient volatile Node tail; ```
-
条件队列 调用
await()
的时候会释放锁,然后线程会加入到条件队列,调用signal()
唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁```java /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; ```
父类AbstractOwnableSynchronizer
AQS 继承的父类AbstractOwnableSynchronizer,该类仅一个属性用于记录当前持有锁的线程,提供get/set方法。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
}
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
// 独占模式下的线程
private transient Thread exclusiveOwnerThread;
// 设置独占线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
内部类 Node
static final class Node {
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
// 值为0,表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 结点状态
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 结点所对应的线程
volatile Thread thread;
// 下一个等待者
Node nextWaiter;
// 结点是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱结点,若前驱结点为空,抛出异常
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
if (p == null) // 前驱结点为空,抛出异常
throw new NullPointerException();
else // 前驱结点不为空,返回
return p;
}
// 无参构造方法
Node() { // Used to establish initial head or SHARED marker
}
// 构造方法
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 构造方法
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
每个被阻塞的线程都会被封装成一个Node节点,节点内包含了被阻塞的线程,之后node会被加入到队列。
Node 节点状态
AQS 定义了5个队列中节点状态:
- 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
CANCELLED
,值为1,表示当前的线程被取消;SIGNAL
,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;CONDITION
,值为-2,表示当前节点在等待condition,也就是在condition队列中;PROPAGATE
,值为-3,表示当前场景下后续的acquireShared能够得以执行;
类的内部类 ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
private transient Node firstWaiter;
private transient Node lastWaiter;
public ConditionObject() { }
}
此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下:
public interface Condition {
// 等待,当前线程在接到信号或被中断之前一直处于等待状态
void await() throws InterruptedException;
// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
void awaitUninterruptibly();
//等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
void signal();
// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
void signalAll();
}
重要属性
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 版本号
private static final long serialVersionUID = 7373984972572414691L;
// 头结点
private transient volatile Node head;
// 尾结点
private transient volatile Node tail;
// 状态
private volatile int state;
// 自旋时间
static final long spinForTimeoutThreshold = 1000L;
// Unsafe类实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// state内存偏移地址
private static final long stateOffset;
// head内存偏移地址
private static final long headOffset;
// state内存偏移地址
private static final long tailOffset;
// tail内存偏移地址
private static final long waitStatusOffset;
// next内存偏移地址
private static final long nextOffset;
// 静态初始化块
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
}
同步状态state
state
字段是一个非常重要的字段,可以基于state字段的值定义出不同的同步锁功能,比如:
- 基于state 的值实现排他锁 state 值为1代表锁被占用,值为0时代表锁未被占用。 代表类:ReentrantLock
- 基于state的值实现读写锁 state 被分成两部分,高16位记录读锁次数,低16位记录写锁次数 代表类:ReentrantReadWriteLock
- 基于state的值实现限制线程数 初始化一个state值,表示最大限制数,即可以做到允许最多N个线程同时运行,达到限流效果 代表类:Semaphore
- 基于state的值实现倒计数 初始化一个state值,state值为0时触发唤醒动作 代表类:CountDownLatch
支持重写的API
AQS 提供了 5 个可以自定义实现功能的API方法,基于这些方法,则可以实现不同类型的锁功能。
- protected boolean tryAcquire(int arg) 尝试一次获得一个排它锁
- protected boolean tryRelease(int arg) 尝试一次释放一个排它锁
- protected int tryAcquireShared(int arg) 尝试一次获得一个共享锁
- protected boolean tryReleaseShared(int arg) 尝试一次释放一个共享锁
- protected boolean isHeldExclusively() 验证排它锁是否被占用
提供的模版方法
下面的这些模版方法,都用到了上面可以重写的API方法。
-
基于
tryAcquire
API提供的模版方法-
获得一个排它锁,直到成功获得锁
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
-
获得一个排它锁,可被中断
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
-
获得一个排它锁,可超时或中断
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
其中
tryAcquire(arg)
方法是需要自己实现的方法 -
-
基于tryAcquireShared API提供的模版方法
-
获得一个共享锁,直到成功获得锁
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
其中
tryAcquireShared(arg)
方法是需要自己实现的方法 -
获得一个共享锁,可被中断
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
-
获得一个共享锁,支持超时或中断
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
-
-
释放一个排它锁
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
其中
tryRelease(arg)
方法是需要自己实现的方法 -
释放一个共享锁
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
其中
tryReleaseShared(arg)
方法是需要自己实现的方法
其他
还有一个与AQS非常相似的类——AbstractQueuedLongSynchronizer,从命名上来看,多了一个Long,从源码上来看,他们两个有完全相同的结构、属性和方法,唯一不同之处就在于所有与状态相关的参数和结果都定于为long类型,而不是int类型,当需要创建64位状态的同步器(例如多级锁和屏障)时,AbstractQueuedLongSynchronizer类可能很有用。