基础同步工具类

基础同步工具类

Semaphore,CountDownLatch,CyclicBarrier均是jdk1.5提供的基础并发工具:

  • Semaphore是一个计数信号量,用于限制同时访问某个特定资源的数量
  • CountDownLatch是一个闭锁,允许一个或多个线程等待一组其他线程执行完成后执行,但只能使用一次
  • CyclicBarrier是一个循环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行,并且支持重复使用

本文内容默认读者已经理解AQS,若存在疑问,请先前往 AQS同步器 了解

Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class X{

Random random = new Random();

Semaphore semaphore = new Semaphore(10, true);

// 方法本身控制同步
public synchronized Integer getNextAvailableItem() throws InterruptedException {

// 信号量控制访问次数
semaphore.acquire();

return random.nextInt();
}

public synchronized boolean markAsUnused(Integer item){

// do something

semaphore.release();

return true;
}
}

这是Semaphore的一个标准的使用方式,用于控制流量。上述程序创建了一个允许10个线程同时访问的信号量,并且使用公平锁(一般来说用于控制流量的使用需要使用公平模式,用于防止线程饥饿),然后在提供获取资源的接口getNextAvailableItem方法前先获取凭证,在释放资源后释放凭证。但是注意Semaphore不保证并发正确性,这需要接口自己保证,因此这里使用synchronized来提醒这一点。

简介

Semaphore默认使用非公平锁,也可以显示的设置使用公平锁

1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

公平锁FairSync和非公平锁NonfairSync均继承于内部类Sync,而Sync继承AQS(AbstractQueuedSynchronizer)锁。获取锁和释放锁均在Sycn中实现

1
2
3
4
5
6
7
8
9
10
11
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public void acquireUninterruptibly() {
sync.acquireShared(1);
}

public void release() {
sync.releaseShared(1);
}

acquire

acquire在获取非公平锁的实现底层核心方法为nonfairTryAcquireShared

1
2
3
4
5
6
7
8
9
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

Semaphore在初始化的时候会将state设为凭证数,在每次获取锁时nonfairTryAcquireShared会将state-1直到state为0,当state为0时则代表不可以再获取共享锁了。在具体实现上,这是一个标准的子类获取共享锁的实现模式。它本质是一个共享锁,会允许多个线程同时进入,因此在之前的使用介绍也提到了Semaphore不能确保并发正确性。

1
2
3
4
5
6
7
8
9
10
11
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

而对于公平模式的获取锁和ReentrantLock的实现相同,会先调用hasQueuedPredecessors来判断当前线程是否位于等待队列中的第一个,仅在处于队列的第一个时才会尝试获取锁,从而保证了获取锁的先后顺序。

还需要注意的是,无论是公平锁还是非公平锁,Semaphore的acquire是调用的acquireSharedInterruptibly,因此它是可中断的

1
2
3
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

如果需要使用不支持中断的,可以使用acquireUninterruptibly

1
2
3
public void acquireUninterruptibly() {
sync.acquireShared(1);
}

release

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void release() {
sync.releaseShared(1);
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

实际上就是调用AQS的释放共享锁的方法,那么本质上尝试释放锁就是通过重载tryReleaseShared实现的。因为共享锁的释放锁是存在并发的,所以需要通过CAS自旋更新state状态,每次释放都会将state+1。这也是一个标准的子类释放共享锁的实现模式。因此我们也要警惕使用release,因为它会导致当前state大于凭证数,意味着如果释放次数大于获取次数会导致同时允许的线程数大于凭证数。

  There is no requirement that a thread that releases a permit must have acquired that permit by calling acquire(). Correct usage of a semaphore is established by programming convention in the application.

  没有要求释放许可证的线程必须先通过调用acquire()获得该许可证。通过应用程序中的编程约定来建立信号量的正确使用。

应用

了解了Semaphore原理后,这里通过Semaphore实现一个线程池只能同时执行两个任务的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

public static void main(String[] args) {

final Semaphore semaphore = new Semaphore(2, true);

AtomicInteger count = new AtomicInteger(0);

ExecutorService executorService = new ThreadPoolExecutor(
5, 5, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100),
new ThreadPoolExecutor.DiscardOldestPolicy()
);

int threadCnt = 10;
for (int i = 0; i < threadCnt; i++) {

executorService.execute(new Task(semaphore, count));
}

executorService.shutdown();

try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}

}

}

class Task implements Runnable {

private Semaphore semaphore;

private AtomicInteger count;

public Task(Semaphore semaphore, AtomicInteger count) {
this.semaphore = semaphore;
this.count = count;
}

@Override
public void run() {

try {

semaphore.acquire();
} catch (InterruptedException e) {

System.out.println("Semaphore 中断 " + Thread.currentThread().getName());

return;
}

try {

doIt();
} finally {

semaphore.release();
}
}

private void doIt() {

System.out.println(Thread.currentThread().getName() + ": is running [" + count.addAndGet(1) + "]");

try {

Thread.sleep((int)(1 + (Math.random() * 3)));

} catch (InterruptedException e) {
e.printStackTrace();
} finally {

count.decrementAndGet();
}
}
}

Task任务会在执行前先获取信号量,并对同时在运行的任务进行计数,在执行完任务后会重置计数并释放信号量。在实现过程中需要注意两个重要的点,也是实际使用时需要注意的点:

  1. doIt执行任务本身需要保证并发安全,所以count使用Atomic类。
  2. Semaphore的acquire和release不要在同一个try中,否则当acquire获取失败时仍然会执行release,而release并不控制凭证数,这会导致有可能产生比设置时更大的state

在这个案例中,正常结果打印出来的同时运行的线程数不会超过2,比如:

1
2
3
4
5
6
7
8
9
10
pool-1-thread-2: is running [2]
pool-1-thread-1: is running [1]
pool-1-thread-3: is running [2]
pool-1-thread-4: is running [2]
pool-1-thread-2: is running [1]
pool-1-thread-5: is running [2]
pool-1-thread-1: is running [2]
pool-1-thread-3: is running [1]
pool-1-thread-4: is running [2]
pool-1-thread-5: is running [2]

重点回顾

  • Semaphore底层通过AQS共享锁实现,支持公平/非公平模式
  • Semaphore应用场景主要用于控制流量
  • Semaphore并不保证并发正确性,需要接口本身保证
  • Semaphore的release释放次数大于acquire获取次数时会导致并发数大于凭证数,因此这需要由调用者正确控制

CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import java.util.concurrent.*;

public class Main {

public static void main(String[] args) {

int threadCnt = 10;

CountDownLatch startSignle = new CountDownLatch(1);

CountDownLatch doneSignle = new CountDownLatch(threadCnt);

ExecutorService executorService = new ThreadPoolExecutor(
5, 5, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100),
new ThreadPoolExecutor.DiscardOldestPolicy()
);


for (int i = 0; i < threadCnt; i++) {

executorService.execute(new Task(startSignle, doneSignle));
}

System.out.println("All task start.");

startSignle.countDown();

try {
doneSignle.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("All task done.");

executorService.shutdown();
}

}

class Task implements Runnable {

private CountDownLatch startSignle;

private CountDownLatch doneSignle;

public Task(CountDownLatch startSignle, CountDownLatch doneSignle) {
this.startSignle = startSignle;
this.doneSignle = doneSignle;
}

@Override
public void run() {

try {

startSignle.await();

System.out.println(Thread.currentThread().getName() + " is running");

Thread.sleep(2000);

} catch (InterruptedException e) {
e.printStackTrace();
} finally {

doneSignle.countDown();
}
}
}

上述程序创建了两个分别用于启动和结束的CountDownLatch,startSignle用于所有子线程等待主线程发送执行的信号,doneSignle用于主线程等待所有子线程完成的信号。正常结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
All task start.
pool-1-thread-2 is running
pool-1-thread-3 is running
pool-1-thread-4 is running
pool-1-thread-1 is running
pool-1-thread-5 is running
pool-1-thread-4 is running
pool-1-thread-1 is running
pool-1-thread-2 is running
pool-1-thread-5 is running
pool-1-thread-3 is running
All task done.

简介

CountDownLatch的构造函数只有一个参数,用于控制await线程被执行前必须先执行线程的个数

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

Sync继承AQS(AbstractQueuedSynchronizer)锁,因此CountDownLatch也是基于AQS的一个实现

1
2
3
4
5
6
7
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public void countDown() {
sync.releaseShared(1);
}

await调用的acquireSharedInterruptibly意味着它支持中断

await

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

await的获取共享锁的方式和之前所说的Semaphore的实现对state的处理是完全相反的:

  • 在Semaphore中是在state大于0时允许获取锁
  • 在CountDownLatch中是在state=0时允许获取锁

这很好理解,await的线程需要在N个线程执行countDown后才允许被唤醒,和Semaphore的逻辑正好相反。

countDown

1
2
3
4
5
6
7
8
9
10
11
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

countDown本质上就是释放共享锁,每次执行会将state-1直到0。在释放到state=0后不会再释放。这就说明了两个问题:

  • 即使执行countDown的次数大于初始化时设置的count值也是不会有问题的,因为releaseShared会直接返回释放失败。
  • 即使先执行countDown,只要执行到足够的次数,再执行await也能成功获取到锁。

  为什么await方法获取锁成功是返回1,而不是0呢?

  这是因为在AQS中,释放共享锁后会唤醒后继结点,而后续的唤醒则依赖于获取锁的线程的传播式向后唤醒结点,而这依赖于tryAcquireShared的返回结果,当返回0时会被AQS认为无剩余共享资源导致无法唤醒后续结点。那么这就会导致最后一个countDown执行完后无法唤醒所有由await阻塞的线程

最后那么我们来模拟多个线程等待多个线程执行完成后唤醒的过程:假设A, B两个线程等待m, n线程执行完成才能执行,而A, B先于m, n执行

  1. A线程调用await,因为state为2进入AQS等待队列,为头结点
  2. B线程调用await,因为state为2进入AQS等待队列,插入队尾
  3. m线程调用countDown,将state设为1,释放锁成功,尝试唤醒A,A尝试获取锁但因为state!=0,唤醒失败
  4. n线程调用countDown,将state设为0,释放锁成功,尝试唤醒A,A尝试获取锁因为state=0,A唤醒成功
  5. A获取锁成功返回1允许传播式尝试唤醒B,B尝试获取锁因为state=0,B唤醒成功

重点回顾

  • CountDownLatch底层通过AQS共享锁实现
  • CountDownLatch的应用场景为一个或多个线程等待一组其他线程执行完成后执行
  • CountDownLatch的countDown次数大于初始化时设置的count值时会抛出异常
  • CountDownLatch的countDown方法可以先于await方法先执行

CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {

public static void main(String[] args) {

int threadCnt = 5;

CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCnt);

AtomicInteger count = new AtomicInteger(0);

ExecutorService executorService = new ThreadPoolExecutor(
5, 10, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100),
new ThreadPoolExecutor.DiscardOldestPolicy()
);

for (int i = 0; i < threadCnt; i++) {

executorService.execute(new Task(cyclicBarrier, threadCnt * 2, count));
}

executorService.shutdown();

try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("All tasks done.");

}

}

class Task implements Runnable {

private CyclicBarrier cyclicBarrier;

private AtomicInteger count;

private Integer taskCnt;

public Task(CyclicBarrier cyclicBarrier, Integer taskCnt, AtomicInteger count) {
this.cyclicBarrier = cyclicBarrier;
this.taskCnt = taskCnt;
this.count = count;
}

@Override
public void run() {

try {

while (!isDone()){

System.out.println(Thread.currentThread().getName() + " starts.");

Thread.sleep(2000);

count.addAndGet(1);

System.out.println(Thread.currentThread().getName() + " is waiting.");

cyclicBarrier.await();
}

System.out.println(Thread.currentThread().getName() + " quit.");

} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}

private boolean isDone(){
return taskCnt.equals(count.get());
}
}

上述程序创建了一个需要5个线程到达后每个线程才能执行后续流程的循环栅栏CyclicBarrier。每个任务执行完毕后会将count+1以及通过await等待其他线程完成。而每个线程完成所有任务的标志是count=10,因此在前5个执行完成后又会重复的执行一轮,最后全部线程退出。正常结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
pool-1-thread-2 starts.
pool-1-thread-5 starts.
pool-1-thread-4 starts.
pool-1-thread-3 starts.
pool-1-thread-1 starts.
pool-1-thread-5 is waiting.
pool-1-thread-4 is waiting.
pool-1-thread-3 is waiting.
pool-1-thread-1 is waiting.
pool-1-thread-2 is waiting.
pool-1-thread-2 starts.
pool-1-thread-5 starts.
pool-1-thread-4 starts.
pool-1-thread-3 starts.
pool-1-thread-1 starts.
pool-1-thread-4 is waiting.
pool-1-thread-1 is waiting.
pool-1-thread-5 is waiting.
pool-1-thread-3 is waiting.
pool-1-thread-2 is waiting.
pool-1-thread-2 quit.
pool-1-thread-4 quit.
pool-1-thread-3 quit.
pool-1-thread-1 quit.
pool-1-thread-5 quit.
All tasks done.

  CyclicBarrier结合线程池使用需要注意死锁问题,当线程池可执行线程数小于CyclicBarrier触发栅栏的线程时会产生死锁

简介

CyclicBarrier存在两个构造函数,parties用于执行在触发栅栏之前需要执行的线程数,barrierAction为触发栅栏的线程首先执行该任务后才会唤醒所有等待的线程

1
2
3
4
5
6
7
8
9
10
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
this(parties, null);
}

await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 当前代
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;
// 当index=0时代表触发栅栏
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 首先执行传入的任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 然后更新代,唤醒所有等待线程
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// 如果还没触发栅栏,则阻塞
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 如果未设置超时时间,则直接阻塞
if (!timed)
trip.await();
// 否则对阻塞设置超时时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

CyclicBarrier是一个可以循环使用的栅栏,因此它有一个“代”的概念,即每个在触发栅栏之前需要执行的线程数为一代,每执行一次任务则会将需要执行的线程数减一直到0,这时候就进入了新的一代,即新的循环。 await实际上通过ReentrantLock + Condition完成线程的阻塞和唤醒:

  1. 判断当前线程在当前代中的位置,如果还不能触发栅栏,则调用condition.await/awaitNanos对当前线程进行阻塞

  2. 如果一个线程触发了栅栏,首先执行传入的Runnable任务,然后唤醒所有等待的任务,再更新代

  3. 每个被唤醒的线程检查当前代是否已经更新,如果已经更新,则返回在阻塞时还剩余需要执行的线程数。因此 barrier.await() == 0 时意味着当前代需要执行的最后一个线程已完成,可以做些一轮任务做完需要做的工作,比如整合,日志等

    1
    2
    3
    if (barrier.await() == 0) {
    // log the completion of this iteration
    }

    但是它和barrierAction还是有些区别,barrierAction中执行的内容会在唤醒其他线程前执行(新代执行前),而 barrier.await() == 0 内执行的内容则是在唤醒其他线程后执行的(有可能新代已经开始执行),因此在使用时需要多加考虑

  CountDownLatch和CyclicBarrier比较相似,都是多个线程相互等待后执行,但它们还是有比较大的区别:

  1. 从实现来看,CountDownLatch使用的是共享锁,所以一次countDown能唤醒所有await等待的线程;而CyclicBarrier使用的互斥锁+Condition的方式,由调用await触发栅栏的线程来唤醒一个代中的所有线程(signAll)
  2. 从功能来看,CountDownLatch只允许使用一次,而CyclicBarrier允许循环使用
  3. 从应用来看,CountDownLatch适用于一个或多个线程等待一组线程执行完成后执行,比如初始化;而CyclicBarrier适合用于一组线程相互之间等待,达到一个共同点,再继续执行。比如并行计算,计算中涉及多个子任务阶段式完成任务
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×