ReentrantLock源码解析(一)

Saadiya ·
更新时间:2024-09-21
· 756 次阅读

本篇博客深入源码分析 ReentrantLock 加锁过程

ReentrantLock可以实例化两种锁,FairSync和NonfairSync

ReentrantLock lock = new ReentrantLock(true); lock.lock();

本篇以公平锁为例

ReentrantLock.java

final void lock() { acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // 获取锁 protected final boolean tryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //获取当前lock的状态值 int c = getState(); //判断当前锁是否空闲 if (c == 0) { //判断当前的锁状态空闲,继续判断是否需要排队,若!hasQueuedPredecessors()返回true,不需要排队,cas获取锁,设置当前线程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 当前锁不空闲,判断当前线程是否为拥有锁的线程,如果是,重入锁,状态值+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

首先要理解,ReentrantLock的底层用到三个重要的东西:CAS,双向链表组成的FIFO队列,Park()和Unpark(),他解决同步问题的思路是把并发问题尽可能的在jdk解决,减少OS的内核态切换;这里要对比下synchronized关键字,1.6之前synchronized是一把重量级锁,每次都会切换内核态和用户态,之后sun不断优化synchronized,在java对象头上增加了锁升级的过程,减少OS操作,1.8之后基本两者的同步效率相同,或者在并发达到一定级别,synchronized的效率更高一些

再一个就是什么情况下会存在锁竞争,交替执行是不存在锁竞争的,接下来会模拟线程的多种情况:

//判断是否需要排队 public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; /** 下面的这一行代码单独拿出来分析下,很复杂,情况很多,建议对整个加锁流程熟悉后单独来分析 第一种情况,队列没有初始化,即不存在锁竞争,此时的head==tail==null,return false,不需要排队 第二种情况,队列初始化过只有一个节点,这时的h==t==node,return false,不需要排队 第三种情况,队列至少有两个节点,此时h!=t为true,继续判断head的next,也就是整个队列的第二个节点s是否为null,一定不是null,false, 再去判断s是不是当前的线程,后面会说到,排在第二个的节点的线程是队列优先级最高的,如果是当前线程,说明现在优先级最高的线程想要获取锁 s.thread != Thread.currentThread()返回false,整个表达式返回false,不需要排队 */ return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }

1.交替执行,没有锁竞争,队列也没有初始化,只有thread1来了,获取锁,c>0,执行完毕,释放锁,c=0;

此时thread2 来,head==tail==null,hasQueuedPredecessors()返回false,!hasQueuedPredecessors()返回true,cas加锁

2.存在锁竞争,t1持有锁,t2来c>0;!tryAcquire(arg)返回true,入队acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

先说一下链表的Node节点结构

static final class Node{ /** 双向链表,一个prev,一个next,一个Thread 存储线程实例,其他属性暂时忽略 */ volatile Node prev; volatile Node next; volatile Thread thread; }

再说一下队列结构AbstractQueuedSynchronizer.java

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** 只展示双向链表的头节点和尾节点,其他属性暂时忽略 */ private transient volatile Node head; private transient volatile Node tail; }

入队的过程

private Node addWaiter(Node mode) { // 创建当前线程的Node节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 把tail记录到临时节点pred,如果pred==null,说明队列没有初始化过,此时head==tail==null,直接入队enq(node) // 如果pred!=null,队列至少有一个节点,这时做链表关系的整理,tail=node,node作为原来队列末尾的next if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ // 把node节点加到队列,死循环设计,包含了队列初始化和添加尾节点;返回整理完毕后的尾节点 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 初始化队列 // 注意,这一步是最牛逼的,保证队列的head是一个空节点,他的思想是我的队列第一个不需要排队,他表示正在处理,node的thread=null //真正第一个排队的是队列的第二个node中的thread if (compareAndSetHead(new Node())) // 这时tail==head==new node 不等于null tail = head; } else { // 队列不为空,设置新的tail,返回尾节点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

一般我们认为,你获取不到,入队阻塞就行了;但是Doug Lea 多给了一次机会!注意,这里是自旋,又是一次死循环设计的骚操作

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取当前节点的前一个节点 final Node p = node.predecessor(); //判断如果当前节点的前一个节点是head,那么我们认为当前节点是队列的第二个节点,也就是排队优先级最高的线程,我们会让他去尝试获取锁 //为什么?因为可能这个时候拥有锁的t1释放了锁,但是没来得及唤醒(锁的释放有很多步,中间的cpu时间片没来得及处理完,比如set state=0; // LockSupport.unpark(t2);这时我们认为中间过程,t2是可以主动去获取锁的,所以有了这一次的自旋) if (p == head && tryAcquire(arg)) { // tryAcquire 和之前的一样,尝试获取锁,判断state==0,判断是否需要排队,如果成功获取锁,剔除旧的head,剔除旧引用,clear node的thread // 返回false,外层(!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 返回false,条件不成立,不执行selfInterrupt() setHead(node); //head = node; //node.thread = null; //node.prev = null; p.next = null; // help GC failed = false; return interrupted; } // 获取锁失败,执行锁失败后的是否park的逻辑,当shouldParkAfterFailedAcquire方法返回true,继续往下执行parkAndCheckInterrupt() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

接下来分析下执行锁失败后的是否park的逻辑

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 解释一下 waitStatus是node的属性,加锁过程只讨论 -1和0 // 获取前节点的ws,第一次进来是0,因为ws默认是0,整个加的过程没有改变过;外层调用是一个for的死循环,外层加锁不成功再次进入这个方法 // 此时判断ws == Node.SIGNAL ws==-1成立,返回true int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //第一次进来是0,代表前一个节点不是排队的状态,cas将前一个节点的ws变更为-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

到这终于要阻塞,park上场

private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

LockSupport.java

public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); }

Unsafe.class 这里是class文件 三个本地方法

public native void unpark(Object var1); public native void park(boolean var1, long var2); private static void setBlocker(Thread t, Object arg) { // Even though volatile, hotspot doesn't need a write barrier here. UNSAFE.putObject(t, parkBlockerOffset, arg); }

看到这几个代码段,有个难点是,为什么会涉及到一个Thread.interrupted()?????其实对于一个reentrantlock来说,最开始是这一行代码

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

我第一次看这个代码的时候,看错了,实际上selfInterrupt()是当大的条件返回true时才会调用,为什么?我再来分析一遍

public final void acquire(int arg) { if (!tryAcquire(arg) // tryAcquire 判断是否获取锁成功,如果获取锁成功,!tryAcquire返回false,整个if条件false,线程不会阻断,就相当于加了个if判断, //会继续执行lock之后的代码,也就是临界区内的代码,那么什么情况下会加锁,获取锁失败,执行入队,才有可能阻塞 && // 在哪里阻塞的?上面的提到的shouldParkAfterFailedAcquire(p, node) 返回true之后,会执行parkAndCheckInterrupt(),这个代码里有两行, // 一行LockSupport.park(this)底层调用的是native park来阻塞,这个能理解,因为我之前就说过,但是为什么要返回一个Thread.interrupted() // 实际上,单独看lock方法,完全跟这个interrupted没有关系,也就是说,我这个方法只管阻塞!他就算是个void,lock也可以实现,这里的迷惑代码实际上是 //因为另外一个方法lock.lockInterruptibly(),见下代码段 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ selfInterrupt(); } } // 这个方法包装了lock,增加判断,我的线程是否被interrupted过,如果是,抛异常;和lock一样,尝试加锁;若失败,入队 public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }

这是lockInterruptibly()的入队逻辑,唯一区别的是下面注释中部分

private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

为什么parkAndCheckInterrupt() 要返回一个boolean的return Thread.interrupted()?因为对于lockInterruptibly()而言,根据这个来判断在park的过程中线程是否被打断过,返回true,整个条件返回true,抛异常;但是lock()和parkAndCheckInterrupt() 都用了parkAndCheckInterrupt(),所以lock把原来的void 改造了

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;

线程被唤醒时,返回Thread.interrupted(),对于lock()来说,大条件返回true,interrupted=true,for循环获取锁,方法acquireQueued返回true,执行selfInterrupt(),恢复thread用户状态;

总的来说,如果只有lock,一个void park操作就可以,为了方法复用,才做的改造

加锁过程到这里分析结束,下一篇分析ReentrantLock解锁


作者:Onstduy



reentrantlock

需要 登录 后方可回复, 如果你还没有账号请 注册新账号