Why-为什么有这篇文章
不知道大家有没有在面试中遇到这样的问题:“你有用过java.util.concurrent包下面的哪些工具类?能挑两个说说它的实现么?”
java.util.concurrent包下面,Doug Lea为我们广大java语言的使用者提供了很多工具类,比如CountDownLatch, CyclicBarrier, ReetrantLock等。翻一翻它们的源码,就可以看到大致的实现都逃不过需要进行同步处理,而java中有个关键字synchronized就可以达到这个目的。不过synchronized具有很多局限性(比如需要传入一个对象作为同步标识;而且它是非公平的),所以就有了Doug Lea所写的这些并发工具类。
这篇文章的目的就是为了对这些并发工具类都具有的同步处理手法进行一个概括,然后简要地自己动手写一个同步器。这样我们再去看AQS或者ConcurrentLinkedQueue等源码地时候,能更方便的理解。
What-我们要实现的是什么
问题场景
清楚地定义问题,问题就被解决了一半。所以我们先好好讨论一下同步器需要做什么。为此,我们想想一下这个场景:
@RestController
@RequestMapping("/")
public class TestController {
Logger logger = LoggerFactory.getLogger(TestController.class);
private static volatile int storage = 5;
@RequestMapping("test")
public String test(){
if(storage>0){
storage = storage - 1;
logger.info("库存扣减成功,剩余:"+storage);
}else{
logger.info("库存不足"+storage);
}
return String.valueOf(storage);
}
}
假设我们有N个线程,要去对一个变量storage = 5进行扣减操作。比如N = 20的时候,会发生什么情况呢?
我用Jmeter模拟20个线程调用上述的controller后,以下是输出结果:
2020-03-11 19:11:28.870 INFO 35616 --- [nio-8080-exec-1] c.k.demo.controller.TestController : 库存扣减成功,剩余:2
2020-03-11 19:11:28.870 INFO 35616 --- [nio-8080-exec-3] c.k.demo.controller.TestController : 库存扣减成功,剩余:2
2020-03-11 19:11:28.870 INFO 35616 --- [nio-8080-exec-4] c.k.demo.controller.TestController : 库存扣减成功,剩余:2
2020-03-11 19:11:28.870 INFO 35616 --- [nio-8080-exec-5] c.k.demo.controller.TestController : 库存扣减成功,剩余:2
2020-03-11 19:11:28.870 INFO 35616 --- [nio-8080-exec-2] c.k.demo.controller.TestController : 库存扣减成功,剩余:2
2020-03-11 19:11:28.890 INFO 35616 --- [nio-8080-exec-6] c.k.demo.controller.TestController : 库存扣减成功,剩余:1
2020-03-11 19:11:28.941 INFO 35616 --- [nio-8080-exec-7] c.k.demo.controller.TestController : 库存扣减成功,剩余:0
2020-03-11 19:11:28.990 INFO 35616 --- [nio-8080-exec-9] c.k.demo.controller.TestController : 库存不足0
2020-03-11 19:11:29.050 INFO 35616 --- [io-8080-exec-11] c.k.demo.controller.TestController : 库存不足0
2020-03-11 19:11:29.088 INFO 35616 --- [io-8080-exec-13] c.k.demo.controller.TestController : 库存不足0
2020-03-11 19:11:29.140 INFO 35616 --- [io-8080-exec-15] c.k.demo.controller.TestController : 库存不足0
2020-03-11 19:11:29.190 INFO 35616 --- [io-8080-exec-17] c.k.demo.controller.TestController : 库存不足0
2020-03-11 19:11:29.239 INFO 35616 --- [io-8080-exec-19] c.k.demo.controller.TestController : 库存不足0
2020-03-11 19:11:29.289 INFO 35616 --- [io-8080-exec-22] c.k.demo.controller.TestController : 库存不足0
2020-03-11 19:11:29.340 INFO 35616 --- [io-8080-exec-23] c.k.demo.controller.TestController : 库存不足0
2020-03-11 19:11:29.390 INFO 35616 --- [io-8080-exec-26] c.k.demo.controller.TestController : 库存不足0
2020-03-11 19:11:29.441 INFO 35616 --- [io-8080-exec-25] c.k.demo.controller.TestController : 库存不足0
2020-03-11 19:11:29.489 INFO 35616 --- [io-8080-exec-29] c.k.demo.controller.TestController : 库存不足0
2020-03-11 19:11:29.540 INFO 35616 --- [io-8080-exec-31] c.k.demo.controller.TestController : 库存不足0
2020-03-11 19:11:29.590 INFO 35616 --- [io-8080-exec-33] c.k.demo.controller.TestController : 库存不足0
库存一共有5个,但是20个线程有7个线程都扣成功了。
问题分析
很明显,这里由于并发请求,导致了一些问题。回到java代码第10和第11行:
if(storage>0){
storage = storage - 1;
...
}
这里的if判断,storage扣减,storage – 1的结果赋值回storage,在jvm中,并不是一个原子操作。所以存在这种可能:
线程1,线程2均通过判断storage > 0,然后进入了storage = storage – 1这行代码。虽然这是一行代码,但是计算机在执行指令的时候,是
先读出storage值
storage - 1
赋值storage
这里的3步并不是原子操作。比如线程1和2都读出storage的值为4;然后进行扣减操作,结果为3;然后把3再赋值回storage. 这里,两个线程都扣库存成功了,但是storage的值却只减了1.
至此,问题的原因已经被我们找到了。
问题可能的解法
- 使用synchronized关键字将需要同步的代码块围起来
- 使用Lock类的实现将在判断前lock,操作后release
- 我想自己写一个同步器来进行同步操作
我看了一眼这篇文章的标题,那我只能采用第三种方式来做了。
How-怎么实现同步器
同步器关键要素
我为了进行同步操作,我需要考虑哪些问题呢?
1. 我希望这个同步器是排他的,也就是线程A占用了,其他线程就不能占用
2. 我希望这个同步器是公平的,也就是遵循先来后到,线程1先来,后面来的线程就等着
3. 我希望提供外部访问的接口,用来获取同步器和释放同步器。
4. 我希望中间不要引入悲观锁
为了实现1,我们得有个标记来表明同步器是不是处于可获取状态;同时我如果获取了同步状态,我还得标记上是我获取的。同时我还得有个地方存着那些没有成功获取的线程。
为了实现2,那些没有成功获取的线程,得是有序保存的,并且FIFO,很自然我们想到了队列。
为了实现3,那就是对外提供获取和释放的方法。
为了实现4,我们就得使用CAS(CompareAndSwap)方法,从而引入UnSafe类
于是同步器的关键要素就出来了:
对应的java代码为:
public class SampleLock {
private volatile int lockState = 0;//保存同步器状态
private volatile Thread lockHolder;//保存锁的持有者
private static Unsafe unsafe;//用来做Cas的类
private static long lockStateOffset;//用来存放lockState在SampleLock类中的偏移量
private static ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();
public void lock() {
}
public void unLock() {
}
}
Unsafe类的扩展:
顾名思义,Unsafe类是不安全的类。它是Jvm提供的可以直接对内存地址进行访问、操作的类。其具体详细的API在此不展开。但是需要注意,获得Unsafe实例只能通过反射,或者指定系统加载器加载(本文使用反射的方式)。本文主要会用到它里面的:
* objectFieldOffset(field):用于定位field这个属性在类中的偏移量(主要用于定位变量在内存中的地址用)
* compareAndSwapInt(targetVariable,targetOffset,expectValue,toUpdateValue):CAS操作int类型变量
targetVariable:将要更新的变量
targetOffset:变量的偏移量
expectValue:期待更新前的值
toUpdateValue:更新后的值(即将要更新为的目标值)
接下来,我们只需要填入lock()和unLock()的实现,不过我们先做一些初始化的工作:
public class SampleLock {
private volatile int lockState = 0;//保存同步器状态
private volatile Thread lockHolder;//保存锁的持有者
private static Unsafe unsafe;//用来做Cas的类
private static long lockStateOffset;//用来存放lockState在SampleLock类中的偏移量
private static ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();
static {
Field unsafeField = Unsafe.class.getDeclaredFields()[0];
unsafeField.setAccessible(true);
try {
unsafe = (Unsafe) unsafeField.get(null);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
try {
lockStateOffset = unsafe.objectFieldOffset(SampleLock.class.getDeclaredField("lockState"));
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
}
public void lock() {
}
public void unLock() {
}
}
以上的初始化工作,只是获取unsafe实例、初始化lockStateOffset的值。至此,我们自己写的同步器关键要素就已经出来了。
Lock方法的实现
首先,我们既然要CAS获取锁,不妨先一进来就CAS一下,CAS成功,我们就记录下哪个线程成功了(赋值lockHolder)。然后返回。
public void lock(){
Thread currentThread = Thread.currentThread();
if(unsafe.compareAndSwapInt(lockState,lockStateOffset,0,1) ){
this.lockHolder = currentThread;
return;
}
//...
}
但是如果CAS不成功呢?CAS如果不成功,我们希望它可以有多试几次的机会但是试多少次呢?不一定啊,那就无限重试?比如:
for(;;){
if(unsafe.compareAndSwapInt(lockState,lockStateOffset,0,1) ){
this.lockHolder = currentThread;
return;
}
}
这样的确从逻辑上来讲没有问题,但是并发多的情况下,大量线程在这边空跑不是浪费CPU么?我们考虑把在这个循环里面让线程停住不要占用CPU,于是很自然的想到了LockSupport.park()方法。
public void lock(){
Thread currentThread = Thread.currentThread();
if(unsafe.compareAndSwapInt(lockState,lockStateOffset,0,1) ){
this.lockHolder = currentThread;
return;
}
for(;;){
if(unsafe.compareAndSwapInt(lockState,lockStateOffset,0,1) ){
this.lockHolder = currentThread;
return;
}else{
waiters.offer(currentThread);
LockSupport.park();//线程在这里会停住,直到有其他线程通知它可以继续了。
}
}
}
这样一来,我们可以让持有锁的那个线程,在释放锁的时候,调用一下LockSupport.unpark(thread)方法讲它唤醒。为什么不用yeild(), sleep()等方法呢?因为我们不确定什么时候另外一个线程能够释放锁,只有持有锁的那个线程知道,那么就让它来通知,是最合适的方法。
同时,我们还需要将这个线程放入等待队列暂存起来。这样一来,持有锁的线程在释放的时候,直接让队列第一个元素unPark,这不就很nice了嘛。
UnLock方法的实现
unLock的主要目的是要释放锁,而释放锁需要更新两个变量:lockState=0和lockHolder=null. 这里我们要注意,我们只可以用CAS的方法操作某一个变量达到原子性,而我们要操作谁呢?回顾下lock()方法,一进来就要CAS的变量是lockState. lockState才是控制锁是否可用的关键变量。所以我们先将lockHolder置为空,然后CAS lockState从1到0(这里采用CAS的方法还有一个原因是我在实验中发现如果这个变量是CAS更新的,直接取lockState的值可能没有更新).
public void unlock(){
Thread currentThread = Thread.currentThread();
if(lockHolder != currentThread){
throw new RuntimeException("lock holder is not current thread");
} else {
lockHolder = null;
unsafe.compareAndSwapInt(lockState,lockStateOffset,1,0);
if(waiters.size()>0){
Thread firstInWatiers = waiters.poll();
LockSupport.unpark(firstInWatiers);
}
}
}
两个点需要注意下:
判断lockHolder是不是当前线程,只有lockHolder才能解锁。解锁完成后,如果队列不为空,需要将队首poll出来,并且unpark.
测试
再次使用JMeter模拟20个线程:
2020-03-11 20:47:23.528 INFO 25684 --- [nio-8080-exec-2] c.k.demo.controller.TestController : 库存扣减成功,剩余:4
2020-03-11 20:47:23.528 INFO 25684 --- [nio-8080-exec-7] c.k.demo.controller.TestController : 库存扣减成功,剩余:3
2020-03-11 20:47:23.529 INFO 25684 --- [nio-8080-exec-6] c.k.demo.controller.TestController : 库存扣减成功,剩余:2
2020-03-11 20:47:23.529 INFO 25684 --- [io-8080-exec-10] c.k.demo.controller.TestController : 库存扣减成功,剩余:1
2020-03-11 20:47:23.529 INFO 25684 --- [nio-8080-exec-3] c.k.demo.controller.TestController : 库存扣减成功,剩余:0
2020-03-11 20:47:23.549 INFO 25684 --- [io-8080-exec-11] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:23.599 INFO 25684 --- [io-8080-exec-14] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:23.650 INFO 25684 --- [io-8080-exec-18] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:23.699 INFO 25684 --- [nio-8080-exec-1] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:23.748 INFO 25684 --- [nio-8080-exec-5] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:23.797 INFO 25684 --- [nio-8080-exec-8] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:23.848 INFO 25684 --- [nio-8080-exec-9] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:23.899 INFO 25684 --- [io-8080-exec-12] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:23.950 INFO 25684 --- [io-8080-exec-16] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:23.999 INFO 25684 --- [io-8080-exec-20] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:24.050 INFO 25684 --- [io-8080-exec-26] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:24.100 INFO 25684 --- [io-8080-exec-30] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:24.148 INFO 25684 --- [io-8080-exec-34] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:24.197 INFO 25684 --- [io-8080-exec-38] c.k.demo.controller.TestController : 库存不足0
2020-03-11 20:47:24.247 INFO 25684 --- [io-8080-exec-42] c.k.demo.controller.TestController : 库存不足0
可以看到,只有5个线程成功扣减库存。从一定程度上,反应的程序的正确性。不过可能还有小的地方没有考虑周全,但其实AQS的核心逻辑和这个整体上是差不多的。
Conclusion 总结
本文通过简单的实现了一个同步器,从而初窥Doug Lea大师的某些思想。一个同步器基本上需要包含:
* 同步器状态标记
* 获取同步器的线程
* 没有获取到同步器的线程存放
* CAS
如果我们翻开AQS(AbstractQueueSynchronizer)的源码,我们可以看到它也采用了CAS;采用的不是队列而是双向链表来做等待队列,而在队列首尾更改的时候也用了CAS;提供了公平和非公平接口;state可以大于1来支持重入等。所以总体思想这个简单的同步器和AQS是一致的。
希望通过这篇文章,在记录下我笔记的同时,能够帮助更多人理解同步器,能够在读java.util.concurrent的源码的时候,有所启发。