AbstractQueuedSynchronizer(AQS) 总结篇

Published on November 23, 20204 min read

简介

在之前已经有6篇关于AQS源码分析的文章了,关于源码分析的一些问题可以去看看我之前的文章,文章连接可以在文末查看。这一篇文章主要是对AQS的一些总结,或者说是面经。

AQS是什么

AQS 全称是AbstractQueuedSynchronizer,在java.util.concurrent.locks包下面,是一个抽象的可以实现阻塞线程、排队控制、唤醒线程等操作的同步器基础框架类,AQS 可以实现排它锁、共享锁、条件锁、计数器等相关功能。

AQS 对资源的共享方式

AQS定义两种资源共享方式

// AQS.NODE
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
  • Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
    • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
  • Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。

ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。

两个队列

AQS 里面有两个队列,我称为同步队列和条件队列。条件队列主要是实现条件锁时用到的队列,同步队列就是维护唤醒线程的队列。

  1. 同步队列 主要用于维护获取互斥锁失败时入队的线程

    private transient volatile Node head;
    private transient volatile Node tail;
  2. 条件队列 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁

    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

父类AbstractOwnableSynchronizer

AQS 继承的父类AbstractOwnableSynchronizer,该类仅一个属性用于记录当前持有锁的线程,提供get/set方法。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
...
}
 
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
 
    private static final long serialVersionUID = 3737899427754241961L;
 
    protected AbstractOwnableSynchronizer() { }
    // 独占模式下的线程
    private transient Thread exclusiveOwnerThread;
    // 设置独占线程
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
 
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

内部类 Node

static final class Node {
    // 模式,分为共享与独占
    // 共享模式
    static final Node SHARED = new Node();
    // 独占模式
    static final Node EXCLUSIVE = null;
    // 结点状态
    // CANCELLED,值为1,表示当前的线程被取消
    // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
    // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
    // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
    // 值为0,表示当前节点在sync队列中,等待着获取锁
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
 
    // 结点状态
    volatile int waitStatus;
    // 前驱结点
    volatile Node prev;
    // 后继结点
    volatile Node next;
    // 结点所对应的线程
    volatile Thread thread;
    // 下一个等待者
    Node nextWaiter;
 
    // 结点是否在共享模式下等待
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
 
    // 获取前驱结点,若前驱结点为空,抛出异常
    final Node predecessor() throws NullPointerException {
        // 保存前驱结点
        Node p = prev;
        if (p == null) // 前驱结点为空,抛出异常
            throw new NullPointerException();
        else // 前驱结点不为空,返回
            return p;
    }
 
    // 无参构造方法
    Node() {    // Used to establish initial head or SHARED marker
    }
 
    // 构造方法
        Node(Thread thread, Node mode) {    // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
 
    // 构造方法
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

每个被阻塞的线程都会被封装成一个Node节点,节点内包含了被阻塞的线程,之后node会被加入到队列。

Node 节点状态

AQS 定义了5个队列中节点状态:

  1. 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
  2. CANCELLED,值为1,表示当前的线程被取消;
  3. SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
  4. CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
  5. PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

类的内部类 ConditionObject

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
 
        private transient Node firstWaiter;
        private transient Node lastWaiter;
 
        public ConditionObject() { }
}

此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下:

public interface Condition {
 
    // 等待,当前线程在接到信号或被中断之前一直处于等待状态
    void await() throws InterruptedException;
 
    // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
    void awaitUninterruptibly();
 
    //等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
    long awaitNanos(long nanosTimeout) throws InterruptedException;
 
    // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
    boolean await(long time, TimeUnit unit) throws InterruptedException;
 
    // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
    boolean awaitUntil(Date deadline) throws InterruptedException;
 
    // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
    void signal();
 
    // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
    void signalAll();
}

重要属性

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    // 版本号
    private static final long serialVersionUID = 7373984972572414691L;
    // 头结点
    private transient volatile Node head;
    // 尾结点
    private transient volatile Node tail;
    // 状态
    private volatile int state;
    // 自旋时间
    static final long spinForTimeoutThreshold = 1000L;
 
    // Unsafe类实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // state内存偏移地址
    private static final long stateOffset;
    // head内存偏移地址
    private static final long headOffset;
    // state内存偏移地址
    private static final long tailOffset;
    // tail内存偏移地址
    private static final long waitStatusOffset;
    // next内存偏移地址
    private static final long nextOffset;
    // 静态初始化块
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));
 
        } catch (Exception ex) { throw new Error(ex); }
    }
}

同步状态state

state字段是一个非常重要的字段,可以基于state字段的值定义出不同的同步锁功能,比如:

  1. 基于state 的值实现排他锁 state 值为1代表锁被占用,值为0时代表锁未被占用。 代表类:ReentrantLock
  2. 基于state的值实现读写锁 state 被分成两部分,高16位记录读锁次数,低16位记录写锁次数 代表类:ReentrantReadWriteLock
  3. 基于state的值实现限制线程数 初始化一个state值,表示最大限制数,即可以做到允许最多N个线程同时运行,达到限流效果 代表类:Semaphore
  4. 基于state的值实现倒计数 初始化一个state值,state值为0时触发唤醒动作 代表类:CountDownLatch

支持重写的API

AQS 提供了 5 个可以自定义实现功能的API方法,基于这些方法,则可以实现不同类型的锁功能。

  1. protected boolean tryAcquire(int arg) 尝试一次获得一个排它锁
  2. protected boolean tryRelease(int arg) 尝试一次释放一个排它锁
  3. protected int tryAcquireShared(int arg) 尝试一次获得一个共享锁
  4. protected boolean tryReleaseShared(int arg) 尝试一次释放一个共享锁
  5. protected boolean isHeldExclusively() 验证排它锁是否被占用

提供的模版方法

下面的这些模版方法,都用到了上面可以重写的API方法。

  • 基于tryAcquireAPI提供的模版方法

    1. 获得一个排它锁,直到成功获得锁

      public final void acquire(int arg) {
          if (!tryAcquire(arg) &&
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }
    2. 获得一个排它锁,可被中断

      public final void acquireInterruptibly(int arg)
                  throws InterruptedException {
          if (Thread.interrupted())
              throw new InterruptedException();
          if (!tryAcquire(arg))
              doAcquireInterruptibly(arg);
      }
    3. 获得一个排它锁,可超时或中断

      public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                  throws InterruptedException {
          if (Thread.interrupted())
              throw new InterruptedException();
          return tryAcquire(arg) ||
              doAcquireNanos(arg, nanosTimeout);
      }

    其中tryAcquire(arg)方法是需要自己实现的方法

  • 基于tryAcquireShared API提供的模版方法

    1. 获得一个共享锁,直到成功获得锁

      public final void acquireShared(int arg) {
          if (tryAcquireShared(arg) < 0)
              doAcquireShared(arg);
      }

      其中tryAcquireShared(arg)方法是需要自己实现的方法

    2. 获得一个共享锁,可被中断

      public final void acquireSharedInterruptibly(int arg)
                  throws InterruptedException {
          if (Thread.interrupted())
              throw new InterruptedException();
          if (tryAcquireShared(arg) < 0)
              doAcquireSharedInterruptibly(arg);
      }
    3. 获得一个共享锁,支持超时或中断

      public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
                  throws InterruptedException {
          if (Thread.interrupted())
              throw new InterruptedException();
          return tryAcquireShared(arg) >= 0 ||
              doAcquireSharedNanos(arg, nanosTimeout);
      }
  • 释放一个排它锁

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    其中tryRelease(arg)方法是需要自己实现的方法

  • 释放一个共享锁

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    其中tryReleaseShared(arg)方法是需要自己实现的方法

其他

还有一个与AQS非常相似的类——AbstractQueuedLongSynchronizer,从命名上来看,多了一个Long,从源码上来看,他们两个有完全相同的结构、属性和方法,唯一不同之处就在于所有与状态相关的参数和结果都定于为long类型,而不是int类型,当需要创建64位状态的同步器(例如多级锁和屏障)时,AbstractQueuedLongSynchronizer类可能很有用。

AQS实现源码分析

  1. 源码分析:同步基础框架之AbstractQueuedSynchronizer(AQS)
  2. 源码分析:①ReentrantLock之公平锁和非公平锁
  3. 源码分析:②ReentrantLock之条件锁Condition
  4. 源码分析:ReentrantReadWriteLock之读写锁
  5. 源码分析:Semaphore之信号量
  6. 源码分析:CountDownLatch 之倒计时门栓
  7. 源码分析:升级版的读写锁 StampedLock
Share this article