【阅读笔记】 jdk12 juc 锁

分析 java.util.concurrent 中的锁如何实现

前言

java.util.concurrnet.lock 包实现了可重入锁和读写锁,主要目的是和 synchronized 一样, 两者都是为了解决同步问题,处理资源争端而产生的技术。

功能类似但有一些区别

  • lock 更灵活,可以自由定义多把锁的枷锁解锁顺序(synchronized 要按照先加的后解顺序)
  • 提供多种加锁方案,lock, trylock, lockInterruptily 等等
  • lock 支持公平锁或非公平锁,synchronized 是非公平锁
  • lock 使用 Unsafe.park 和 Unsafe.unpark 实现,底层使用了 mutex
    synchronized 使用 monitorenter 和 monitorexit 指令实现,底层使用了对象锁,比较复杂

基础

1. Unsafe park

Unsafe 提供 park 和 unpark 方法用于 block 和 unblock 线程,LockSupport 类对这些方法进行了封装

2. 同步队列

AbstractQueuedSynchronizer (AQS)提供一个同步框架。这个框架提供了一个 FIFO 队列用于实现阻塞等待,这
个队列上每个节点都是一个等待着的线程。

AQS 有 3 个主要成员

  • Node head,表示队列头
  • Node tail,表示队列尾部
  • int state,表示同步状态
    3 个成员都用 VarHandle 实现原子读写

Node 是 AQS 的嵌套类,成员有

  • int waitStatus,状态,取值有
    • SIGNAL,表示当前节点 BLOCKED,当这个节点释放时,需要 unpark 后续节点
    • CANCELLED,由于超时或 interrupt,这个节点取消,进入该状态后的结点将不会再变化
    • CONDITION,表示节点在 condition queue 中,如果取不变为 0,就不能用作同步队列的节点 TODO
    • PROPAGATE,共享模式相关
    • 0
  • Node prev
  • Node next
  • Thread thread,这个节点关联的线程
  • Node nextWaiter,指向下个等待的 condition 的 Node,或者 Node.SHARED

除 nextWaiter 的 4 个成员都用 VarHandle 实现原子读写

子类根据需要覆写下列成员方法就可以实现特定的功能,覆写这些方法需要使用 getState,setState 和 compareAndSetState 对 state 字段进行检查和修改

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

3. 独占 & 共享

  • 独占锁,一个线程获取了锁,其它线程就不能获取到,必须等锁释放
  • 共享锁,可以多个线程获取到锁

独占 or 共享,由 AQS 节点中的 nextWaiter 成员表示。 nextWaiter 等于 Node.SHARED 就表示共享,其他表示独占。

4. 公平 & 非公平锁

  • 公平锁,先启动的先获取锁,后启动的后获取锁
  • 非公平锁,不保证获得锁的顺序

这些由 AQS 的子类实现

Lock 接口

  1. 获取锁
1
2
3
4
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  1. 释放锁
1
void unlock();
  1. 产生 condition 对象
1
Condition newCondition();

ReentrantLock 实现

ReentrantLock 实现可重入锁,默认情况下创建的 ReentrantLock 是非公平锁,使用 ReentrantLock(boolean fair) 可以创建公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// ReentrantLock
public void lock() {
sync.acquire(1); // sync 可以是 NonfairSync 或 FairSync 类型
}

// AbstractQueuedSynchronizer 类的方法
public final void acquire(int arg) {
/*
tryAcquire 由 NonfairSync 或 FairSync 实现,如果获取不到锁,返回 false
NonfairSync 和 FairSync 都继承 Sync,区别是 FairSync 的 tryAcquire 方法
有调用 hasQueuedPredecessors,判断同步队列有没有前置节点,即有没有别的节点在等待,
有的话就放弃获取

tryAcquire 返回 false 的话,就会调用
1. 调用 addWaiter(Node.EXCLUSIVE),增加新的 waiter 到同步队列
2. 调用 acquireQueued(waiter, arg),以独占方式获取队列,会调用 tryAcquire,
如果获取不到当前线程 park,等待被 unpark
*/
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// ReentrantLock
public void unlock() {
sync.release(1);
}

// AbstractQueuedSynchronizer 类的方法
public final boolean release(int arg) {
/*
tryRelease 由 Sync 实现,减少 state 的值,如果减少后 state 为 0 就返回 true

tryRelease 返回 true 的话,就从 head 开始,unpark 第一个 waitState 是 SIGNAL 的节点
*/
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // upark 线程
return true;
}
return false;
}

ReadWriteLock 和 ReentrantReadWriteLock

ReadWriteLock 是读写锁,它其实由两把锁组成:读锁和写锁。写锁之间是共享模式,一个线程得到读锁,另
一个线程也可以继续获得;写锁和写锁、写锁和读锁之间是独占模式,一个线程得到写锁后,另一个线程既不能
获得写锁,也不能获得读锁

ReadWriteLock 接口提供两个方法
Lock readLock() 和 Lock writeLock() 分别获取读锁和写锁

ReentrantReadWriteLock 实现了 ReadWriteLock,嵌套类 WriteLock、ReadLock 分别实现了写锁和读锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// ReentrantReadWriteLock.WriteLock
public void lock() {
sync.acquire(1);
}

// AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// ReentrantReadWriteLock.Sync
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;

setExclusiveOwnerThread(current);
return true;
}

public void unlock() {
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

// ReentrantReadWriteLock.ReadLock
public void lock() {
sync.acquireShared(1);
}

protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

public void unlock() {
sync.release(1);
}