ReentrantLock源码解析(二)

Xandy ·
更新时间:2024-09-21
· 951 次阅读

1 数据结构

ReentrantLock是可重入锁,又分为公平锁和非公平锁。类图如下:
在这里插入图片描述

1.1 AQS源码解析 https://blog.csdn.net/qq_34125999/article/details/105343472 1.2 Sync /** * ReentrantLock 基础结构 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 抽象方法,加锁方式,由公平锁、非公平锁实现 */ abstract void lock(); /** * 非公平锁获取资源 */ final boolean nonfairTryAcquire(int acquires) { //当前线程 final Thread current = Thread.currentThread(); //获取同步状态 int c = getState(); //如果状态为0,表示没有线程拥有资源 if (c == 0) { //cas修改同步状态,修改成功设置获取资源的线程,返回抢占资源成功 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果当前线程就是持有资源的线程 else if (current == getExclusiveOwnerThread()) { //同步状态增加acquires int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //更新同步状态,返回抢占资源成功 setState(nextc); return true; } //返回抢占资源失败 return false; } /** * 释放资源 */ protected final boolean tryRelease(int releases) { //获取当前同步状态,减去释放的资源 int c = getState() - releases; //判断释放资源的线程是否是当前线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); //资源标记 boolean free = false; //释放资源后,同步状态为0,那么释放成功,设置占有资源的线程为null if (c == 0) { free = true; setExclusiveOwnerThread(null); } //更新同步状态 setState(c); //返回资源标记 return free; } //判断当前线程是否是占有资源的线程 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } //创建condition final ConditionObject newCondition() { return new ConditionObject(); } //获取占有资源的线程 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } //获取同步状态 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } //判断是否可以抢占资源 final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } 1.3 NonfairSync /** * 非公平锁 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 加锁 */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } /** * 尝试获取资源 */ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } 1.4 FairSync /** * 公平锁 */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; /** * 加锁 */ final void lock() { acquire(1); } /** * 尝试获取资源 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } 2 非公平锁加锁 2.1 NonfairSync lock static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 加锁 */ final void lock() { //cas抢资源,如果同步状态是0,当前线程抢到资源,那么就不用入队,直接干活 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //获取一个资源 acquire(1); } /** * 尝试获取资源 */ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } 2.2 AbstractQueuedSynchronizer acquire public final void acquire(int arg) { //尝试获取资源、加入队尾巴、进入等待 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 2.3 tryAcquire

(1) NonfairSync tryAcquire

protected final boolean tryAcquire(int acquires) { //非公平锁获取资源 return nonfairTryAcquire(acquires); }

(2) Sync nonfairTryAcquire

final boolean nonfairTryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //获取同步状态 int c = getState(); //当前状态为0,代表没有线程拥有资源 if (c == 0) { //cas当前线程抢占资源,抢占成功,设置抢到资源的线程,返回抢占资源成功 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果当前线程是拥有资源的线程 else if (current == getExclusiveOwnerThread()) { //同步状态增加资源量,更新状态,返回抢占资源成功 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //抢占资源失败 return false; } 2.4 AQS addWaiter(Node.EXCLUSIVE)

(1) addWaiter

/** * 添加等待者 */ private Node addWaiter(Node mode) { //创建一个独占节点 Node node = new Node(Thread.currentThread(), mode); //获取AQS队尾 Node pred = tail; //如果队尾存在 if (pred != null) { //当前节点的前指针指向队尾 node.prev = pred; //cas快速更新队尾,(expect:队尾,update:当前节点).更新成功,更新以前队尾的后指针 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //自旋更新队尾 enq(node); //返回当前节点 return node; }

(2) AQS enq(node)

/** * 自旋向CLH队列队尾插入数据 */ private Node enq(final Node node) { //自旋 for (;;) { //获取队尾 Node t = tail; //如果队尾为空,创建CLH队列的队头、队尾 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //当前节点的前指针指向队尾 node.prev = t; //cas设置队尾,设置成功后更新队尾的后指针,返回当前节点 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 2.5 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

(1) AQS acquireQueued

/** * 自旋,等待 或 成功抢占资源返回 */ final boolean acquireQueued(final Node node, int arg) { //失败标记 boolean failed = true; try { //打断标记 boolean interrupted = false; //自旋 for (;;) { //获取当前节点的前驱节点 final Node p = node.predecessor(); //如果当前节点是CLH队列第二个节点,并且抢占了资源 if (p == head && tryAcquire(arg)) { //更新头节节点 setHead(node); //原头节点后指针清空 p.next = null; // help GC failed = false; return interrupted; } //判断是否进入等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

(2) AQS shouldParkAfterFailedAcquire

/** * 是否进入等待 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获得当前节点前驱节点的状态 int ws = pred.waitStatus; //如果前节点是SIGNAL状态,进入等待状态,返回true if (ws == Node.SIGNAL) return true; //如果前驱点解的状态是(CANCELLED),寻找有效的前驱节点 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } //cas把前驱节点设置为SIGNAL状态 else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //返回false,重新进入自旋 return false; }

(3) AQS parkAndCheckInterrupt

/** * 调用LockSupport进行等待(LockSupport调用的是unsafe) */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 2.6 时序图

在这里插入图片描述

3 公平锁加锁 /** * 流程和非公平锁差不多,当一个新线程进来非公平锁有抢资源操作,如果抢到资源那么不用进入CLH队列,非公平锁必须入CLH队列 * 所以非公平锁比公平锁快 */ final void lock() { acquire(1); } 4 解锁 4.1 ReentrantLock unlock public void unlock() { sync.release(1); } 4.2 AQS release /** * 释放资源 */ public final boolean release(int arg) { //尝试释放资源 if (tryRelease(arg)) { //获取头指针 Node h = head; //如果头指针不为空,状态是非CANCELLED状态,唤醒后续有效线程 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

(1) AQS tryRelease

protected final boolean tryRelease(int releases) { //计算当前同步状态 int c = getState() - releases; //如果释放资源线程不是拥有资源的线程则抛出错误 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); //空闲资源标记 boolean free = false; //如果同步状态为0,表示当前没有线程拥有当前资源,空闲标记为true if (c == 0) { free = true; setExclusiveOwnerThread(null); } //更新同步状态 setState(c); return free; }

(2) AQS unparkSuccessor

/** *唤醒等待线程 */ private void unparkSuccessor(Node node) { /* * 当前头节点的状态 */ int ws = node.waitStatus; //如果头节点的状态不是初始化状态,那么cas设置头节点的状态为初始化状态 if (ws 0) { //s指向null s = null; //从尾部找有效节点并且赋值给s for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //如果s指向节点不为null,那么唤醒s节点的线程 if (s != null) LockSupport.unpark(s.thread); }
作者:篮战丶



reentrantlock

需要 登录 后方可回复, 如果你还没有账号请 注册新账号