menu

reentrantlock

本文转载自:https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

作者:美团技术团队

Java 中的大部分同步类(Semaphore、ReentrantLock 等)都是基于 AbstractQueuedSynchronizer(简称为 AQS)实现的。AQS 是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。

本文会从应用层逐渐深入到原理层,并通过 ReentrantLock 的基本特性和 ReentrantLock 与 AQS 的关联,来深入解读 AQS 相关独占锁的知识点,同时采取问答的模式来帮助大家理解 AQS。由于篇幅原因,本篇文章主要阐述 AQS 中独占锁的逻辑和 Sync Queue,不讲述包含共享锁和 Condition Queue 的部分(本篇文章核心为 AQS 原理剖析,只是简单介绍了 ReentrantLock,感兴趣同学可以阅读一下 ReentrantLock 的源码)。

1 ReentrantLock

1.1 ReentrantLock 特性概览

ReentrantLock 意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。为了帮助大家更好地理解 ReentrantLock 的特性,我们先将 ReentrantLock 跟常用的 Synchronized 进行比较,其特性如下(蓝色部分为本篇文章主要剖析的点):

下面通过伪代码,进行更加直观的比较:

// **************************Synchronized的使用方式**************************
// 1.用于代码块
synchronized (this) {}
// 2.用于对象
synchronized (object) {}
// 3.用于方法
public synchronized void test () {}
// 4.可重入
for (int i = 0; i < 100; i++) {
  synchronized (this) {}
}
// **************************ReentrantLock的使用方式**************************
public void test () throw Exception {
  // 1.初始化选择公平锁、非公平锁
  ReentrantLock lock = new ReentrantLock(true);
  // 2.可用于代码块
  lock.lock();
  try {
    try {
      // 3.支持多种加锁方式,比较灵活; 具有可重入特性
      if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
    } finally {
      // 4.手动释放锁
      lock.unlock()
    }
  } finally {
    lock.unlock();
  }
}

1.2 ReentrantLock 与 AQS 的关联

通过上文我们已经了解,ReentrantLock 支持公平锁和非公平锁(关于公平锁和非公平锁的原理分析,可参考《不可不说的 Java“锁”事》),并且 ReentrantLock 的底层就是由 AQS 来实现的。那么 ReentrantLock 是如何通过公平锁和非公平锁与 AQS 关联起来呢? 我们着重从这两者的加锁过程来理解一下它们与 AQS 之间的关系(加锁过程中与 AQS 的关联比较明显,解锁流程后续会介绍)。

非公平锁源码中的加锁流程如下:

// java.util.concurrent.locks.ReentrantLock#NonfairSync

// 非公平锁
static final class NonfairSync extends Sync {
  ...
  final void lock() {
    if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
    else
      acquire(1);
    }
  ...
}

这块代码的含义为:

  • 若通过 CAS 设置变量 State(同步状态)成功,也就是获取锁成功,则将当前线程设置为独占线程。
  • 若通过 CAS 设置变量 State(同步状态)失败,也就是获取锁失败,则进入 Acquire 方法进行后续处理。

第一步很好理解,但第二步获取锁失败后,后续的处理策略是怎么样的呢?这块可能会有以下思考:

  • 某个线程获取锁失败的后续流程是什么呢?有以下两种可能:

(1) 将当前线程获锁结果设置为失败,获取锁流程结束。这种设计会极大降低系统的并发度,并不满足我们实际的需求。所以就需要下面这种流程,也就是 AQS 框架的处理流程。

(2) 存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。

  • 对于问题 1 的第二种情况,既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
  • 处于排队等候机制中的线程,什么时候可以有机会获取锁呢?
  • 如果处于排队等候机制中的线程一直无法获取锁,还是需要一直等待吗,还是有别的策略来解决这一问题?

带着非公平锁的这些问题,再看下公平锁源码中获锁的方式:

// java.util.concurrent.locks.ReentrantLock#FairSync

static final class FairSync extends Sync {
  ...
  final void lock() {
    acquire(1);
  }
  ...
}

看到这块代码,我们可能会存在这种疑问:Lock 函数通过 Acquire 方法进行加锁,但是具体是如何加锁的呢?

结合公平锁和非公平锁的加锁流程,虽然流程上有一定的不同,但是都调用了 Acquire 方法,而 Acquire 方法是 FairSync 和 UnfairSync 的父类 AQS 中的核心方法。

对于上边提到的问题,其实在 ReentrantLock 类源码中都无法解答,而这些问题的答案,都是位于 Acquire 方法所在的类 AbstractQueuedSynchronizer 中,也就是本文的核心——AQS。下面我们会对 AQS 以及 ReentrantLock 和 AQS 的关联做详细介绍(相关问题答案会在 2.3.5 小节中解答)。

2 AQS

首先,我们通过下面的架构图来整体了解一下 AQS 框架:

  • 上图中有颜色的为 Method,无颜色的为 Attribution。
  • 总的来说,AQS 框架共分为五层,自上而下由浅入深,从 AQS 对外暴露的 API 到底层基础数据。
  • 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的 API 进入 AQS 内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

下面我们会从整体到细节,从流程到方法逐一剖析 AQS 框架,主要分析过程如下:

2.1 原理概览

AQS 核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH:Craig、Landin and Hagersten 队列,是单向链表,AQS 中的队列是 CLH 变体的虚拟双向队列(FIFO),AQS 是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

主要原理图如下:

AQS 使用一个 Volatile 的 int 类型的成员变量来表示同步状态,通过内置的 FIFO 队列来完成资源获取的排队工作,通过 CAS 完成对 State 值的修改。

2.1.1 AQS 数据结构

先来看下 AQS 中最基本的数据结构——Node,Node 即为上面 CLH 变体队列中的节点。

解释一下几个方法和属性值的含义:

方法和属性值 含义
waitStatus 当前节点在队列中的状态
thread 表示处于该节点的线程
prev 前驱指针
predecessor 返回前驱节点,没有的话抛出 npe
nextWaiter 指向下一个处于 CONDITION 状态的节点(由于本篇文章不讲述 Condition Queue 队列,这个指针不多介绍)
next 后继指针

线程两种锁的模式:

模式 含义
0 当一个 Node 被初始化的时候的默认值
CANCELLED 为 1,表示线程获取锁的请求已经取消了
CONDITION 为-2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE 为-3,当前线程处在 SHARED 情况下,该字段才会使用
SIGNAL 为-1,表示线程已经准备好了,就等资源释放了

2.1.2 同步状态 State

在了解数据结构后,接下来了解一下 AQS 的同步状态——State。AQS 中维护了一个名为 state 的字段,意为同步状态,是由 Volatile 修饰的,用于展示当前临界资源的获锁情况。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private volatile int state;

下面提供了几个访问这个字段的方法:

方法名 描述
protected final int getState() 获取 State 的值
protected final void setState(int newState) 设置 State 的值
protected final boolean compareAndSetState(int expect, int update) 使用 CAS 方式更新 State

这几个方法都是 Final 修饰的,说明子类中无法重写它们。我们可以通过修改 State 字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。

对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是 AQS 架构图中的第一层:API 层。

2.2 AQS 重要方法与 ReentrantLock 的关联

从架构图中可以得知,AQS 提供了大量用于自定义同步器实现的 Protected 方法。自定义同步器实现的相关方法也只是为了通过修改 State 字段来实现多线程的独占模式或者共享模式。自定义同步器需要实现以下方法(ReentrantLock 需要实现的方法如下,并不是全部):

方法名 描述
ReentrantLock 使用 AQS 保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock 记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。
Semaphore 使用 AQS 同步状态来保存信号量的当前计数。tryRelease 会增加计数,acquireShared 会减少计数。
CountDownLatch 使用 AQS 同步状态来表示计数。计数为 0 时,所有的 Acquire 操作(CountDownLatch 的 await 方法)才可以通过。
ReentrantReadWriteLock 使用 AQS 同步状态中的 16 位保存写锁持有的次数,剩下的 16 位用于保存读锁的持有次数。
ThreadPoolExecutor Worker 利用 AQS 同步状态实现对独占线程变量的设置(tryAcquire 和 tryRelease)。

4.3 自定义同步工具

了解 AQS 基本原理以后,按照上面所说的 AQS 知识点,自己实现一个同步工具。

public class LeeLock  {

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire (int arg) {
            return compareAndSetState(0, 1);
        }

        @Override
        protected boolean tryRelease (int arg) {
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively () {
            return getState() == 1;
        }
    }

    private Sync sync = new Sync();

    public void lock () {
        sync.acquire(1);
    }

    public void unlock () {
        sync.release(1);
    }
}

通过我们自己定义的 Lock 完成一定的同步功能。

public class LeeMain {

    static int count = 0;
    static LeeLock leeLock = new LeeLock();

    public static void main (String[] args) throws InterruptedException {

        Runnable runnable = new Runnable() {
            @Override
            public void run () {
                try {
                    leeLock.lock();
                    for (int i = 0; i < 10000; i++) {
                        count++;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    leeLock.unlock();
                }

            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(count);
    }
}

上述代码每次运行结果都会是 20000。通过简单的几行代码就能实现同步功能,这就是 AQS 的强大之处。

5 总结

我们日常开发中使用并发的场景太多,但是对并发内部的基本框架原理了解的人却不多。由于篇幅原因,本文仅介绍了可重入锁 ReentrantLock 的原理和 AQS 原理,希望能够成为大家了解 AQS 和 ReentrantLock 等同步器的“敲门砖”。

参考资料

  • Lea D. The java. util. concurrent synchronizer framework[J]. Science of Computer Programming, 2005, 58(3): 293-309.
  • 《Java 并发编程实战》
  • 不可不说的 Java“锁”事

评论:


技术文章推送

手机、电脑实用软件分享

微信搜索公众号: AndrewYG的算法世界
wechat 微信公众号:AndrewYG的算法世界