Skip to content

源码分析:①ReentrantLock之公平锁和非公平锁

Published: at 17:46:04

简介

ReentrantLock 是JDK 1.5开始提供的一种可重入的互斥锁,并且构造方法支持公平性参数。

源码分析

类结构体系

ReentrantLock实现了Lock接口:

public class ReentrantLock implements Lock, java.io.Serializable {
...
}

Lock接口中定义了6个方法,需要自己去实现:

public interface Lock {
	// 获得锁
	void lock();
	// 可被中断的获得锁
	void lockInterruptibly() throws InterruptedException;
	// 尝试获取锁(如果可用),并立即返回值true。如果锁不可用,则此方法将立即返回值false
	boolean tryLock();
	// 如果锁可用,此方法将立即返回值true,如果锁不可用,则当前线程将处于休眠状态
	boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
	// 解锁
	void unlock();
	// 条件锁
	Condition newCondition();
}

重要的内部类

ReentrantLock 有3个重要的内部类,分别是 Sync、NonfairSync、FairSync;

  1. Sync 是后面两个的父类,继承至AbstractQueuedSynchronizer
  2. NonfairSync和FairSync都继承至Sync
  3. NonfairSync 主要用于实现非公平锁,FairSync 主要用于实现公平锁

重要的属性

ReentrantLock 就一个属性,就是sync,在构造方法中初始化,通过构造方法参数决定使用公平锁还是非公平锁实现。

private final Sync sync;

两个构造方法

无参构造方法构造非公平锁:

public ReentrantLock() {
    sync = new NonfairSync();
}

有参构造方法构造公平锁:

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

获得锁:lock()

获得锁的主要代码

public void lock() {
    sync.lock();
}

从上面代码可以看出,锁的实现主要是在sync里面,而sync的实现有两个,分为公平和非公平锁,所以这里要分别看两种情况下不同的实现。

ReentrantLock lock = new ReentrantLock(); 或 ReentrantLock lock = new ReentrantLock(true);

公平获得锁

sync.lock() 最终会调用FairSync.lock()里面的实现,FairSync中获得锁的对应源码如下:

static final class FairSync extends Sync {
    // 以公平的方式锁
    final void lock() {
        // 调用AQS框架的逻辑
        acquire(1);
    }
    // AQS acquire 方法会调用tryAcquire这个方法
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&  compareAndSetState(0, acquires)) {
                // 成功获得锁,设置锁的所有者为当前线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            // 可重入锁的逻辑
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

其中lock()方法中的acquire(1)方法会调用AQS框架中的实现,AQS框架中的acquire(int)方法是被final修饰的,不能被继承修改,这个方法会继续调用FairSync.tryAcquire()方法。

AQS.acquire() 方法实现如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire() 的逻辑可以总结为:

  1. 尝试获得锁,也就是调用tryAcquire() 方法,成功原子修改state字段标识成功获得锁。
  2. 如果没有获得锁,尝试将当前线程加入到队列addWaiter(node)
  3. 再次尝试获得锁acquireQueued(node,arg)

acquire() 里面的逻辑只有tryAcquire是在 ReentrantLock 中实现的,其他像addWaiter、acquireQueued的分析请看关于AQS的分析文章

刚刚说了,tryAcquire(int)的逻辑实际上就是修改state字段,修改成功就是获得锁

分析上面tryAcquire(int)源码,总结主要逻辑有如下过程:

  1. 获取当前线程
  2. 获取当前state值
    • 如果state为0,说明锁资源空闲,当前没有其他线程获得该锁,当前线程可以获得该锁,获得锁过程如下:
      1. 首先调用hasQueuedPredecessors()方法检查是否还有等待获取锁的时间更长的线程
      2. 没有更早的其他线程排队,就尝试调用CAS方法compareAndSetState(0, acquires)原子修改state值
      3. CAS 原子修改成功,代表当前线程成功获得锁,之后调用setExclusiveOwnerThread(current);设置获得锁的所有者为当前线程
      4. 成功获得锁,返回true
    • 如果state不为0,说明锁资源已经被线程获取了,也有可能是当前线程自己获得了锁资源
      1. 如果锁的所有者是当前线程,则state值+1(不需要使用CAS方式修改,因为之前已经获得了锁,现在是重入,只需要计数+1),这也就是可重入锁的逻辑,成功获得锁,返回true
  3. 没有获得锁,返回false
  4. 之后就是AQS里面的逻辑了,排队、阻塞、等待唤醒获取锁等过程,过程请看之前关于AQS的分析文章

非公平获得锁

非公平获得锁时,sync.lock() 最终会调用NonfairSync.lock()里面的实现,NonfairSync的源码如下:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        // 非公平获得锁,进来直接使用CAS修改,修改成功就是获得锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // acquire 是AQS里面的方法,最终会调用到非公平锁的实现方法tryAcquire
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        // 非公平获得锁
        return nonfairTryAcquire(acquires);
    }
}

源码分析:

  1. 进来直接尝试CAS修改state值,如果修改成功,代表获得锁,然后设置获得锁的所有者为当前线程
  2. CAS修改state失败,调用 acquire(1) 逻辑,最终会调用nonfairTryAcquire(acquires)

nonfairTryAcquire(acquires) 的逻辑是在Sync里面的,主要实现源码如下:

abstract static class Sync extends AbstractQueuedSynchronizer {
...
	final boolean nonfairTryAcquire(int acquires) {
      // 当前线程
      final Thread current = Thread.currentThread();
      // state 状态
      int c = getState();
      if (c == 0) {
          // CAS 修改state 值
          if (compareAndSetState(0, acquires)) {
              // 成功获得锁,修改锁的所有者线程
              setExclusiveOwnerThread(current);
              return true;
          }
      } else if (current == getExclusiveOwnerThread()) {
          // 检查锁的所有者是否是当前线程
          // 当前线程获得锁,可重入逻辑
          int nextc = c + acquires;
          if (nextc < 0) // overflow
              throw new Error("Maximum lock count exceeded");
          setState(nextc);
          return true;
      }
      return false;
  }
...
}

源码分析:

  1. 获取当前线程,当前 state 值
  2. 如果 state 值为0 ,直接使用CAS 修改,修改成功代表获得锁,如果成功获得锁,修改锁的所有者线程为当前线程
  3. 如果 state 值不为0 ,检查锁的所有者是否是当前线程,如果是,进入可重入逻辑,state值+1 ,成功获得锁
  4. 没有获得锁,返回false

通过比较公平获得锁和非公平获得锁的实现逻辑,可以发现他们的主要区别如下:

  1. 非公平获得锁进入时,直接使用CAS尝试获得锁
  2. 公平获得锁时,CAS尝试获得锁之前会检查是否还有等待获取锁的时间更长的线程,也就是hasQueuedPredecessors()的 逻辑。

可被中断的获得锁:lockInterruptibly()

获得锁,除非当前线程被中断。该方法获得的锁也是支持可重入的锁,与lock()方法获得锁的区别就在于该方法获得锁时被中断会抛出InterruptedException异常。

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 自旋
        for (;;) {
            final Node p = node.predecessor();
            // 前驱节点是头结点才尝试获得锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&  parkAndCheckInterrupt())
                // 这个API支持被中断,所以抛出了异常
                // 像lock() 等方法在这里是不会抛出异常的,只是标识了下被中断
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            // 被取消的节点
            cancelAcquire(node);
    }
}

尝试获取锁:tryLock()

尝试获取锁(如果可用),并立即返回值true。如果锁不可用,则此方法将立即返回值false。

源码如下:

public boolean tryLock() {
		// 直接调用的和非公平方式获得锁tryAcquire一样的逻辑,没有获得锁会立即返回false
    return sync.nonfairTryAcquire(1);
}

abstract static class Sync extends AbstractQueuedSynchronizer {
...
  final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
          if (compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true;
          }
      }
      else if (current == getExclusiveOwnerThread()) {
          int nextc = c + acquires;
          if (nextc < 0) // overflow
              throw new Error("Maximum lock count exceeded");
          setState(nextc);
          return true;
      }
      return false;
  }
...
}

从上面源码可以看出,tryLock()方法是直接调用的Sync.nonfairTryAcquire(int) 方法,该方法是NonfairSync.tryAcquire()方法的默认实现,具体的分析见上面非公平获得锁的分析;所以就算你在ReentrantLock 构造方法传入true,tryLock() 还是以非公平的方式获得锁

尝试获取锁:tryLock(time, unit)

如果锁可用,此方法将立即返回值true,如果锁不可用,则当前线程将处于休眠状态,等待指定的时间内获得锁返回true,否则返回false。

获得锁的过程中,当前线程被中断,会抛出InterruptedException异常。

源码如下:

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

AQS代码:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        // 当前线程被中断,抛出异常
        throw new InterruptedException();
    // 尝试获得锁  || 等待指定时间获得不断尝试获得锁
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 计算获得锁的最后期限时间
    final long deadline = System.nanoTime() + nanosTimeout;
    // 当前节点入队列
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 自旋
        for (;;) {
            final Node p = node.predecessor();
            // 当前节点的前驱节点是头结点  才去尝试获得锁
            if (p == head && tryAcquire(arg)) {
                // 获得锁,设置新的头结点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                // 判断是否已经过了最后期限时间,没有获得锁,直接返回false
                return false;
            // 判断是否要阻塞,spinForTimeoutThreshold = 1000 ,相当于允许1000纳秒的误差
            if (shouldParkAfterFailedAcquire(p, node) &&  nanosTimeout > spinForTimeoutThreshold)
                // 继续阻塞线程
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                // 被中断了,抛出异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

源码分析:

  1. 当前线程如果被中断,则立马抛出异常
  2. 尝试获得锁,如果成功获得锁,立即返回true
  3. 以当前线程为节点,加入到等待队列
  4. 再次尝试获得锁,如果成功获得锁,立即返回true
  5. 判断是否已经过了最后期限时间,如果过了期限时间,立即返回false
  6. 调用LockSupport.parkNanos(this, nanosTimeout); 阻塞线程,实际上也就是调用Unsafe类的park
  7. 当前线程如果被中断,则立马抛出异常
  8. 自旋,等待线程被唤醒,重复步骤4~7

注意:从上面源码final Node p = node.predecessor();p == head && tryAcquire(arg)可以看出,在公平模式下,只要有其他更早的线程在排队还没有获得锁,该线程就不可能立马获得锁

解锁:unlock()

释放锁,如果当前线程是锁的持有者,则state减一,如果state为0,则锁被释放。

源码如下:

// ReentrantLock 代码:
public void unlock() {
    sync.release(1);
}
// AQS 代码:
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 如果存在,唤醒下一个节点线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// ReentrantLock 内部类 Sync代码:
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

源码分析:

  1. 如果当前线程是锁的持有者,则state减一
  2. state减为0,表示锁成功释放,设置锁的持有者线程为null
  3. 如果锁释放成功,如果队列中有等待的线程,唤醒下一个线程获取锁

条件锁:Condition

篇幅有限,条件锁在另一篇文章分析。

总结

  1. ReentrantLock 可以实现公平锁和非公平锁,默认是非公平模式
  2. 公平锁和非公平锁的主要区别是:非公平锁在刚获取锁的时候会直接尝试一次CAS修改同步状态,不会管队列中是否有排队等待锁的线程,修改成功就获得锁;公平锁就相反,没有直接CAS修改这一步,而是要去检查队列中是否有更早在排队的线程。
  3. ReentrantLock 只完成加锁(可重入)和解锁的过程,其他功能如排队入队,阻塞,唤醒下一个线程,中断异常等都是在AQS里面实现的