ReentrantLock

ReentrantLock

  ReentrantLock是基于AQS同步器实现的互斥锁,它支持设置公平锁/非公平锁模式,同时具有可重入性。在这里讨论ReentrantLock对这些特性的支持及应用。

标准模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class X {

private final ReentrantLock lock = new ReentrantLock();

public void m() {

lock.lock();

try {

// ... method body

} finally {

lock.unlock();
}
}

}

简介

ReentrantLock默认使用非公平锁,也可以通过显式的使用公平锁

1
2
3
4
5
6
7
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

公平锁FairSync和非公平锁NonfairSync均继承于内部类Sync,而Sync继承AQS(AbstractQueuedSynchronizer)锁。获取锁和释放锁均在Sycn中实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void lock() {
sync.lock();
}

public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public void unlock() {
sync.release(1);
}

非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

abstract void lock();

/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

在之前的 AQS同步器 提到过,AbstractQueuedSynchronizer为子类提供了需要实现的tryAcquire模板方法,非公平锁获取锁调用的底层核心方法是nonfairTryAcquire。首先基于AQS实现获取互斥锁的标准实现:当state为0时代表没有线程持有锁,因此尝试获取锁,如果获取锁成功则将当前线程设为持有锁的线程

1
2
3
4
5
6
7
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}

但和普通的互斥锁不同的是,ReentrantLock还需要支持可重入性:当state不为0(即存在线程持有锁),会继续判断持有锁的是否为当前线程,如果是则允许当前线程获取锁,并将state+1。

1
2
3
4
5
6
7
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}

那么释放逻辑也需要对重入性额外处理

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

首先确保释放锁的线程为持有锁的线程,接下去确保重入次数和释放次数相同(即state=0)才认为释放锁完成,才会将持有锁的线程设为空。

  ReentrantLock的非公平锁模式意味着多个线程获取锁的顺序并不是按照申请锁的顺序,会存在“线程饥饿”的问题

公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先判断当前线程是否位于等待队列中的第一个
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

公平锁和非公平锁唯一的区别在于,它通过hasQueuedPredecessors确保当前线程是否位于等待队列中的第一个时才会尝试竞争锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}

public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

那么也就是说公平模式的获取锁会先判断当前线程是否位于等待队列中的第一个,若不是则直接加入等待队列来确保多个线程按照申请锁的顺序来获取锁

  公平模式可以解决线程饥饿问题,但相比非公平模式,也会使得更多的线程阻塞,产生更多CPU唤醒阻塞线程的开销而影响吞吐量

Condition

Condition是一个多线程间协调通信工具类,在AQS中实现,子类可以创建Condition实现类

1
2
3
final ConditionObject newCondition() {
return new ConditionObject();
}

await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();

// 添加到Condition队列
Node node = addConditionWaiter();

// 释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;

// 判断是否在AQS队列,如果不在则阻塞
// 唤醒时会将当前线程重新插入AQS队列尾
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

// 自旋获取锁直到重新阻塞
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

Condition存在自己的队列,在Condition队列就意味着线程需要signal方法唤醒。await方法主要做以下几步:

  1. 将当前线程加入Condition队列尾
  2. 释放锁,即从AQS队列中退出(因此线程不会同时存在于AQS队列和Condition队列)
  3. 阻塞当前线程等待唤醒(唤醒时会将当前线程重新插入AQS队列尾,然后当它的前驱结点释放锁后unpark唤醒,唤醒后自旋/阻塞获取锁)

signal

1
2
3
4
5
6
7
8
9
10
11
public final void signal() {

// 确保当前线程持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)

// 将Condition队列中的首结点加入AQS队列
doSignal(first);
}

signal方法用于唤醒处于Condition队列中的首结点,但注意它并不是立刻唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void doSignal(Node first) {
do {
// 移除头结点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

Node p = enq(node);
int ws = p.waitStatus;

// 仅在前驱节点的状态处于取消状态或设置前驱节点状态为SIGNAL失败时才会直接唤醒
// 大部分情况都不会在这里唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

signal方法的主要逻辑如下:

  1. 首先它会将头结点从Condition队列取出
  2. 然后通过enq将当前线程加入AQS队列尾
  3. 仅在前驱节点的状态处于取消状态或设置前驱节点状态为SIGNAL失败时才会直接唤醒,否则是等待它在AQS队列的前驱结点释放锁后唤醒(这样它的前驱结点为头结点,它才有资格获取锁,唤醒才有意义)

signalAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

signalAll和signal的区别就在于它会遍历Condition队列,把所有Condition队列中的结点放入AQS队列等待唤醒。

应用

一个经典的应用:生产者/消费者模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumer {

private Queue<Integer> queue;

private Integer max;

private final ReentrantLock lock = new ReentrantLock();

private final Condition empty = lock.newCondition();

private final Condition full = lock.newCondition();

public ProducerConsumer(Queue<Integer> queue, Integer max) {
this.queue = queue;
this.max = max;
}

public void produce(){

new Thread(() -> {

Random random = new Random();

for(;;){

lock.lock();

try {

while (queue.size() >= max) {

try {

full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

int num = random.nextInt();

if(queue.size() >= max) {

break;
}

queue.offer(num);

empty.signalAll();
}finally {

lock.unlock();
}

}

System.out.println("Not safe");
}).start();
}

public void consume(){

new Thread(() -> {

for(;;) {

lock.lock();

try {

while (queue.isEmpty()) {

try {

empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

queue.poll();

full.signalAll();
} finally {

lock.unlock();
}
}

}).start();
}

public static void main(String[] args) {

Queue<Integer> queue = new LinkedList<>();

int max = 10;

ProducerConsumer producerConsumer = new ProducerConsumer(queue, max);

producerConsumer.produce();
producerConsumer.produce();
producerConsumer.consume();
producerConsumer.consume();
}

}

正常情况,不会出现 Not safe

  Synchronized + wait/notify的组合和Lock+Condition组合具有类似的功能,性能上的差别也不是很大,但它们仍然有许多区别。这里举几个典型的例子:

  • Lock+Condition可以选择公平/非公平模式,而Synchronized + wait/notify只能是非公平的
  • Lock+Condition可以唤醒指定Condition,而Synchronized + wait/notify不能指定
  • Lock+Condition可以设置超时时间,而Synchronized + wait/notify只能等待唤醒或中断
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×