AQS的使用、实现原理和源码导读

1. Why -为什么有这篇文章

前几天,我写了一篇《 手把手带你写java同步器,理解AQS 》;而这几天我刚好在读1999年Doug Lea出版的《Concurrent Programming in Java : Design Principles and Patterns (Second Edition)》. 1999年的时候,java还处于1.2的版本阶段,java.util.concurrent包也许Doug Lea还未动手开始写。不过这本书描述的一些设计思想和原则,可以窥见在那个时候,Doug Lea这批人已经在讨论并抽象出并发的一些抽象设计原则,而且还提到了异步事件机制。

回过来再看看java.util.concurrent.AbstractQueuedSynchronizer, 应该可以说是Doug Lea书中所提到的一些思想的很好的一个实现样例。所以我觉得这个时候去看看AQS的源码实现,能更好的理解其中的某些精髓。

2. What-AQS是什么

从名字AbstractQueuedSynchronizer我们可以看出来:

  • 这是一个抽象类
  • 这是一个同步器
  • 这里面用到了队列

同时,由于他是一个抽象类,那么他一定抽象了某些公共行为,然后留下了供我们扩展的方法。所以我们可以大胆的给出一句话来描述AQS:AQS是一个抽象的同步器,它采用队列的方式来缓存未能获取锁的线程。从而保证再并发情况下,能保护好临界资源(能被共同访问的资源)
但是我们点进去AbstractQueuedSynchronizer虽然是一个抽象类,但是却没有抽象方法。原因在于这里的abstract关键字仅仅是为了防止你直接new AbstractQueuedSynchronizer,你需要继承AbstractQueuedSynchronizer,然后才能实例化你自己的子类。这么做有什么意义呢?打开AbstractQueuedSynchronizer,我们发现有这么几个方法没有任何实现,仅仅只是抛出异常:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

这样一来就很好理解了,注意上面的方法有tryAcquire和tryAcuireShared以及对应的release方法。也就是说AQS的子类,可以是排他锁,也可以是共享锁。如果AQS把这些方法都不做任何实现,那不是我的子类全都要去实现一遍?如果我只是想定义一个排他锁,结果我还得把共享锁的tryAcuireShared和release也实现一遍。

3. How- 怎么使用及它是怎么实现的

我们知道了AQS是什么东西,接下来就是要知道怎么使用,以及它关键的实现原理。

3.1 Usage-使用

既然AQS是一个抽象类,要使用它就必然需要去继承它。比如我定义了一个排他的(互斥)同步器,SimpleExclusiveLock

public class SimpleExclusiveLock extends AbstractQueuedSynchronizer {

}

上文讲到有几个tryAcuire和release方法是需要我们实现的,由于我们要做的是排他锁,所以我们需要实现tryAcquire和tryRelease.这里我简单直接摘取ReentrantLock中的实现,如下:


public class SimpleExclusiveLock extends AbstractQueuedSynchronizer { /** * 尝试获取 * @param acquires * @return */ @Override protected boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int state = getState(); if (state == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = state + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /** * 尝试释放 * @param releases * @return */ @Override protected boolean tryRelease(int releases) { int state = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (state == 0) { free = true; setExclusiveOwnerThread(null); } setState(state); return free; } /** * 当前线程是不是锁的持有者 * @return */ @Override protected boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } }

以上代码实现了尝试获取锁、尝试释放锁以及判断当前线程是否是锁的持有者的方法。还有一个问题,这几个方法都是protected的,我作为一个同步器,我得向外提供public方法作为接口吧。打开AQS,找public方法,发现了两个:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

我们发现这个acquire和release方法调用了我们刚刚实现地tryAcquire和tryRelease.很明显,这是一个模板方法模式(某个功能需要N步,建立调用模板-acquire的实现,留下其中的i步作为扩展)。
那么我们直接调用acuqire和release就可以使用这个SimpleExclusiveLock了,但是为了进一步封装,调用者不需要关心那个arg参数是什么,我们再SimpleExclusiveLock中定义如下两个方法,其中的1我们后面会讲到:

public void lock(){
    acquire(1);
}

public void unlock(){
    release(1);
}

接着我们用《 手把手带你写java同步器,理解AQS 》中的那个库存竞争的例子来测试一下这个同步器:

@RestController
@RequestMapping("/")
public class TestController {
    Logger logger = LoggerFactory.getLogger(TestController.class);

    private static volatile int storage = 5;
    private static SimpleExclusiveLock lock = new SimpleExclusiveLock();
    @RequestMapping("test")
    public String test(){
        lock.lock();
        if(storage>0){
            storage = storage - 1;
           logger.info("库存扣减成功,剩余:"+storage);
        }else{
           logger.info("库存不足"+storage);
        }
        lock.unlock();
       return String.valueOf(storage);
    }
}

用JMeter压测后发现同步器起作用,并没有发生超卖的现象:

2020-03-13 14:58:18.983  INFO 36816 --- [nio-8080-exec-3] c.k.demo.controller.TestController       : 库存扣减成功,剩余:4
2020-03-13 14:58:18.983  INFO 36816 --- [nio-8080-exec-2] c.k.demo.controller.TestController       : 库存扣减成功,剩余:3
2020-03-13 14:58:18.984  INFO 36816 --- [nio-8080-exec-4] c.k.demo.controller.TestController       : 库存扣减成功,剩余:2
2020-03-13 14:58:18.984  INFO 36816 --- [nio-8080-exec-1] c.k.demo.controller.TestController       : 库存扣减成功,剩余:1
2020-03-13 14:58:19.022  INFO 36816 --- [nio-8080-exec-5] c.k.demo.controller.TestController       : 库存扣减成功,剩余:0
2020-03-13 14:58:19.072  INFO 36816 --- [nio-8080-exec-7] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.124  INFO 36816 --- [nio-8080-exec-9] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.173  INFO 36816 --- [io-8080-exec-11] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.223  INFO 36816 --- [io-8080-exec-13] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.272  INFO 36816 --- [io-8080-exec-15] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.322  INFO 36816 --- [io-8080-exec-17] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.373  INFO 36816 --- [io-8080-exec-19] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.422  INFO 36816 --- [io-8080-exec-21] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.472  INFO 36816 --- [io-8080-exec-23] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.524  INFO 36816 --- [io-8080-exec-25] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.573  INFO 36816 --- [io-8080-exec-27] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.625  INFO 36816 --- [io-8080-exec-29] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.674  INFO 36816 --- [io-8080-exec-31] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.723  INFO 36816 --- [io-8080-exec-33] c.k.demo.controller.TestController       : 库存不足0
2020-03-13 14:58:19.772  INFO 36816 --- [io-8080-exec-35] c.k.demo.controller.TestController       : 库存不足0

3.2 Structure 整体结构及概要原理

3.2.1 AQS整体类图

我们在3.1 里面已经了解如何通过继承AQS,并且复写关键方法,来实现自己的同步器。注意到在AQS的acquire方法中的实现:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这里涉及到了几个关键结构:queue, Node. 打开AQS的类图:

3.2.2 内部类

发现AQS继承自AbstractOwnableSyncchronizer,并且自己有两个内部类:ConditionObject implements Condition以及Node.我们逐个展开来说:
* AbstractOwnableSynchronizer:

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private transient Thread exclusiveOwnerThread;
    //get set constructor...
}

就定义了一个属性,锁的拥有者(线程)。这个属性保存了当前同步器被那个线程占用着。
* ConditionObject implements Condition:

public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

condition这个接口定义了几个await和signal方法,主要的目的是用来对当前线程进行阻塞,以及对指定线程进行唤醒。ConditionObject作为其实现,这些await和signal其实是对LockSupport的park和unpark方法的封装。可以看到ConditionObject只有两个属性:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

根据注释,我们可以猜测,它用Node节点组成链表来模拟一个队列。这里叫做conditin queue,条件队列。
* Node:

可以看到Node其实是Thread的持有者,每个Node都有对应的线程;同时,Node还有waitStatus以及其相关常量,用来描述Node当前的等待状态;另外,Node还有prev和next用来构建链表.

3.2.3 AQS本身具有属性和方法


* 属性

    /**同步队列头,只能用setHead方法修改。如果已经有队头了,
     *队头的waitStatus(见ConditionObject)保证它不会被CANCELED
     */
    private transient volatile Node head;
    //同步队列尾,只能用enq方法修改
    private transient volatile Node tail;
    //同步状态,本类均使用CAS对其进行赋值
    private volatile int state;
    //直接自旋还是park的阈值,单位为纳秒
    static final long spinForTimeoutThreshold = 1000L;

    /**
     * 用于CAS的一些初始化变量
     */
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;
  • 关键方法
    关键方法可以直接翻AQS源码,这里仅对他们进行简单分类描述
    CAS操作, 包含对AQS已经ConditionObject的一些属性进行CAS赋值
    队列操作, 包含采用CAS操作进行出入队列的封装.例如enq和addWaiter
    获取和释放操作,对外提供的获取同步器和释放同步器操作。例如acquire和release。

3.2.4 两个队列

从前面的内容中,我们提到了连个和队列有关的东西:同步队列、条件队列。这两个队列是用于不同的场景的。当你只是简单的实现一个锁,用来加锁和解锁,那么只需要用到同步队列。而如果需要实现带有条件的阻塞,一旦条件满足则开始获取锁,比如实现阻塞队列(当空的时候不能take,当满的时候不能put),就需要用到条件队列。
* 同步队列(存放等待获取锁的线程
在用AQS实现锁的时候,同一时间只能有一个线程获取到锁(非共享情况),那么没有获取到锁的线程就需要找个数据结构保存起来,这里就是同步队列了。它的实现,其实是采用双向链表的方式。上一节的AQS具有的几个属性其实已经说明了这个事情。它主要存放的是等待获取锁的线程。
一个简要的示意图(不是实际的实现,后续给出实际的实现):

到这里,需要有一个印象,同步队列是双向链表实现的。
* 条件队列(存放等待某一个条件被满足的线程
当我们要实现一个带有阻塞功能的队列,我们需要用到Condition,而前面我们已经知道Condition接口在AQS中有一个内部类对其进行了实现。其中关键的方法就是await和signal.
那么这个条件队列是什么意思呢?它主要保存的是等待某一个条件满足的线程。
而且,由于这是AQS中的一个内部类,而不是一个属性,所以一个AQS中可能会对应多个条件队列。
简单的示意图如下:

* 条件队列和同步队列的关系
当我们在实现一个BlockingQueue的时候,节点在条件队列和同步队列中转移的大致过程如下:
节点需要先获取到锁资源,从而这里可能需要入同步队列进行排队获取锁;
当节点获取到锁资源后,操作add,发现队列满了,然后又需要阻塞await并且从同步队列拿出来放入条件队列中;
当其他线程调用“队列满了”这个codnition的signal方法后,这个节点又从条件队列中出来去参与锁竞争,竞争失败可能又到同步队列。

转换过程需我花了一个下午才想明白,所以希望上面几句话能帮助理解

3.3 Source code trace 跟读源码了解具体实现

我们已经了解了AQS的主要结果,以及大致原理。接下来我们更深入一步,跟着源代码来看看AQS的队列到底是怎么操作的,即获取锁和释放锁的过程。

3.3.1 获取锁的过程

  • 入口
public final void acquire(int arg) {
        if (!tryAcquire(arg) && //尝试获取锁
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//添加节点到同步队列
            //当前线程的interrupt状态置反,这个的作用可以单独开一篇文章
            //在acquire方法中,下面的操作是没有用的,因为代码执行到这里的时候已经没有阻塞了
            selfInterrupt();
    }
  • tryAcquire(int)
    这个方法由AQS的子类进行实现,比如我们实现的SimpleExclusiveLock中:
@Override
    protected boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int state = getState();
        if (state == 0) {//当前AQS的状态为可以获取
            if (!hasQueuedPredecessors() &&//是否需要排队
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);//CAS成功,当前线程成功获取锁,标记持有者
                return true;
            }
        }
        //为了支持重入,如果是当前线程持有的锁,直接增加state的值
        else if (current == getExclusiveOwnerThread()) {
            int nextc = state + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        //没有获取到锁
        return false;
    }
  • addWaiter(Node node)方法
    主要是构建一个新的node放入同步队列中
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;//尾节点
        /**队列采用懒加载,第一个排队的线程进来前整个队列没有节点,这里pred!=null意思是已经初始         *化过队列了
         */
        if (pred != null) {
            //加入队尾,这里只是为了快速尝试一次,万一成功了呢
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //这里才是入队列的主方法
        enq(node);
        return node;
    }
  • acquireQueued(Node,int)
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //死循环park,等unpark的时候可以从这里开始执行
            for (;;) {
                final Node p = node.predecessor();
                /**
                *如果当前线程是第一个排队的再次尝试获取锁,
                *因为持有锁的线程可能已经释放了,防止无意义的park消耗资源
                */
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果获取锁失败需要park,则park. 对interrupted的操作是为了和
                //acquireInterruptly共用代码
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

第一层级的代码已经过完了,现在进入第二层级
* tryAcquire中调用的,hasQueuedPredecessors是否需要排队

//hasQueuedPredecessors队列是否还有前面在排队的
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t && // 首尾不等表明有排队的
            //头节点的下一个是空的或者头节点就是当前节点都表明队列有排队的
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

该方法的方法注释上Doug Lea有标明,此方法的语义和下面的调用等价,只是更有效率:

//队列第一个线程不是当前线程,并且队列中有线程
getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads()

队列中有线程,则需要排队很好理解。队列中第一个线程不是当前线程怎么理解呢?其实是因为在acquireQueued()中被unpark的那个线程进入下一次自旋,也会调用tryAcquire().这个时候它就是队列第一个等待的节点啊,从它的视角来说,当然不需要再排队了。
* enq(Node)入队列方法

    private Node enq(final Node node) {
        //自旋CAS设置上下节点,达到入队列目的
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //懒加载的体现,第一次进来发现没有初始化,会新建Thread=null的节点,
                //代表目前持有锁的线程new Node()
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
  • shouldParkAfterFailedAcquire(Node,Node)判断失败后是否要park的方法
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//检查前一个节点的waitStatus
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            //只有前一个节点为此状态-1的时候,才返回true
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            //这种情况为前一个节点已经CANCEL状态,继续往前搜索
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //这里只是把前一个节点置为-1,表明前一个节点正在运行
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

3.3.2 释放锁的过程

AQS中的release模板方法

    public final boolean release(int arg) {
        if (tryRelease(arg)) {//调用子类实现的tryRelease
            Node h = head;
            //头节点不存在,表明还没初始化队列,不存在竞争
            //头节点的等待状态为!=0表明另一个线程的shouldParkAfterFailedAcquire还未执行
            //这两种情况都不需要唤醒队列中的元素
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//唤醒unpark队列中的元素
            return true;
        }
        return false;
    }
  • tryRelease 子类实现的tryRelease
    protected boolean tryRelease(int releases) {
        //减少重入次数
        int state = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //如果重入次数为0,则置空锁持有者
        if (state == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        //设置状态
        setState(state);
        return free;
    }
  • 唤醒队列第一个等待的Node
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //通常下一个节点就是需要被unpark的,但是有可能它取消了。这种情况,从尾部
        //反向遍历,找到实际上没有被取消的那个节点,将它unpark
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

至此,AQS的结构,加锁、解锁过程已经说完了。还剩下条件队列相关的源码导读,这部分准备单开文章进行记录。

4 Conclusion总结

  • 我们可以继承AQS来实现自己的同步器
  • AQS包含两个队列:条件队列和同步队列;本文详细描述了同步队列的工作过程和源码导读
  • AQS的state在不同的子类中可能表示不同的意义,本文的实现,是作为锁标识和重入次数。
  • AQS主要的思想包含三个重点:自旋,CAS, LockSupport.park unpark

5 后续学习

后续准备读一下AQS各个子类的具体实现,以及它们的使用场景。包括ReentrantLock,CountDownLatch, LimitLatcch, Semaphore, ReentrantReadWriteLock, ThreadPoolExecutor等。

AQS的使用、实现原理和源码导读

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

滚动到顶部