12构建自定义的同步工具

构建自定义的同步工具

状态依赖性的类,操作有基于状态的前提条件:

  • 如FutureTask,获取一个任务结果前提条件是任务状态为“任务已完成”;
  • 如BlockingQueue,从队列中删除元素前提条件是队列状态为“非空”。
  • 如ReentrantLock,获取锁成功的前提条件是锁的状态为“可获取”。

创建条件依赖类最简单的方式:利用现有的同步工具类

基于状态的前提条件与先验条件概念上的区别?

先验条件的目的是验证对象的安全性

获取锁之后,根据前提条件判断能否执行后续操作

状态依赖性管理

单线程和多线程中基于状态的前提条件的区别?

单线程中前提条件失败了,可以直接失败退出

多线程中前提条件可能被其它线程变更,下一刻可能就满足了,因而需要条件等待处理

等待前验状态条件的传统方式:

1
2
3
4
5
6
7
8
9
10
11
12
//可阻塞的状态依赖操作的结构
acquire lock on object state//请求锁
while (precondition does not hold)//判断前提条件
{
//不满足
release lock//释放锁
wait until precondition might hold//等待直到条件满足
optionally fail if interrupted or timeout expires//超时或中断
reacquire lock//重新请求锁
}
perform action//后续操作
release lock//释放锁

现通过一个有界队列的实现来逐步理解条件等待的演变过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//有界缓存实现的基类
public abstract class BaseBoundedBuffer<V> {
private final V[] buf;
private int tail;
private int head;
private int count;

protected BaseBoundedBuffer(int capacity){
this.buf = (V[]) new Object[capacity];
}

protected synchronized final void doPut(V v){
buf[tail] = v;
if (++tail == buf.length){
tail = 0;
}
++count;
}

protected synchronized final V doTake(){
V v = buf[head];
buf[head] = null; //let gc collect
if (++head == buf.length){
head = 0;
}
--count;
return v;
}

public synchronized final boolean isFull(){
return count == buf.length;
}

public synchronized final boolean isEmpty(){
return count == 0;
}
}

演变一:将前提条件的失败传递给调用者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class GrumyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
public GrumyBoundedBuffer(int size){
super(size);
}

public synchronized void put(V v){
if (isFull()){
throw new BufferFullException();
}
doPut(v);
}

public synchronized V take(){
if (isEmpty())
throw new BufferEmptyExeption();
return doTake();
}
}

//当不满足前提条件时,有界缓存不会执行相应的操作

调用者在使用时,并不应该把不满足前提条件作为一种异常,所以往往需要自行捕获重试,如下:

1
2
3
4
5
6
7
8
9
10
11
while (true){
try{
V item = buffer.take();
//对item的其它操作
break;
}catch (BufferEmptyExeption e){
Thread.sleep(timewait)
}
}

//这样处理并不是很漂亮,程序会到处遍布这种不优雅的操作

演变二:通过轮询与休眠来实现简单的阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class SleepyBounedBuffer<V> extends BaseBoundedBuffer<V> {
private static long SLEEP_TIME;
public SleepyBounedBuffer(int size) {
super(size);
}

public void put(V v) throws InterruptedException{
while (true){
synchronized(this){
if (!isFull()){
doPut(v);
return;
}
}
Thread.sleep(SLEEP_TIME);
}
}

public V take() throws InterruptedException{
while (true){
synchronized(this){
if (!isEmpty()){
return doTake();
}
}
Thread.sleep(SLEEP_TIME);
}
}
}

//“轮询与休眠“重试机制

这样处理的关键时要选择合适的休眠时间,这是关于响应性和CPU效率的权衡,下图是对响应性的影响:

image-20200909155341758

更好的方式:当方法由于某个条件变更时,需要提供某种取消机制,如处理中断。但是如此处理也有不小的复杂性,我们需要一种更为简单的处理方式。

条件队列

名字来源:使线程能通过某种方式等待特定条件变成真。传统队列中是一个个元素,条件队列中是一个个正在等待相关条件的线程

与每个对象都可以作为一个锁一样,每一个对象可以作为一个条件队列:

  • Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并且修改对象的状态
  • Object.notify/notifyAll通知被挂起的线程可以重新请求资源执行

与“休眠”的有界队列相比,条件队列没有改变原来的语义。只是在CPU效率、上下文切换、响应性上做了优化。

如果一个功能通过前者无法实现,那通过后者也无法实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {

public BoundedBuffer(int capacity) {
super(capacity);
}

public synchronized void put(V v) throws InterruptedException{
while (isFull()){
wait();
}
doPut(v);
notifyAll();
}

public synchronized V take() throws InterruptedException{
while (isEmpty()){
wait();
}
V v = doTake();
notifyAll();
return v;
}
}

正确的使用条件队列

虽然有许多规则确保正确的使用条件队列,但是编译器或者系统平台并没有强制要求遵循这些规则。(这也是尽量基于现有状态依赖性类来构造程序的原因之一)

条件谓词

要想正确的使用条件队列,关键是找出队列在哪个条件上等待。

什么是条件谓词?

条件谓词是使用条件队列的关键,但是在代码上没有什么API规范去正确使用,因而可能会有一些困惑

条件谓词是状态依赖操作的前提条件,如上诉中的“任务已完成”,“队列非空”,“锁可获取”

条件队列、条件谓词、锁之间额关系?

  • 条件谓词包含多个状态变量,状态变量需要用锁保护,因而判断条件谓词需先持有锁。
  • 当使用条件队列时,调用者必须持有与条件队列相关的锁。

过早唤醒

一个条件队列和多个条件谓词相关。过早唤醒可能发生于:

  • 被其他条件谓词的变更线程唤醒
  • 唤醒时条件谓词可能为真,但是重新判断时又可能假了

因此,条件谓词的判断和wait操作必须放在循环中,使用的标准模板如下:

1
2
3
4
5
6
7
8
void stateDependentMethod() throws InterruptedException
{
synchronized(lock) // 必须通过一个锁来保护条件谓词
{
while(!condietionPredicate())
lock.wait();
}
}

当使用条件等待时(如Object.wait(), 或Condition.await()):

  • 通常都有一个条件谓词–包括一些对象状态的测试,线程在执行前必须首先通过这些测试
  • 在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试
  • 在一个循环中调用wait
  • 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量
  • 当调用wait, notify或notifyAll等方法时,一定要持有与条件队列相关的锁
  • 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。

通知

当条件谓词变更为ture时,一定要确保以某种方式对挂机的线程发出通知,条件队列的API中通知的方式有两种:

  • notify:从等待线程中选择一个唤醒
  • notifyAll:唤醒所有等待线程

使用notify而非notifyAll的场景:

  1. 只有一个条件谓词相关
  2. 单进单出:每次只唤醒一个

尽管notify的性能更好,但是一定要符合上诉条件;多个条件谓词相互时,使用notify是一个非常危险的操作

如果唤醒了没有释放锁会怎样?

被唤醒的线程必须重新持有所有才能走后续操作,所以还要等待负责唤醒的线程释放锁。

条件通知和通知有啥区别?

如果没有线程被挂起,通知操作也就没有意义。通知前如果进行是否需要通知的判断,也就成为条件通知。

如当队列从“已满”变成“未满”状态,才发起通知

1
2
3
4
5
6
7
8
9
public synchronized void put(V v) throws InterruptedException
{
while(isFull())
wait();
boolean wasEmpty = isEmpty();//由空变为非空
doPut(v);
if(wasEmpty)
notifyAll();
}

使用通知实现”阀门”:

之前用CountDownLatch实现,只能使用一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ThreadGate {
private boolean isOpen;
private int generation;

public synchronized void close() {
isOpen = false;
}

public synchronized void open() {
++generation;
isOpen = true;
notifyAll();
}

public synchronized void await() throws InterruptedException {
int arrivalGeneration = generation;
while (!isOpen && arrivalGeneration == generation)
wait();
}
}

//generation防止打开一瞬间又关闭的行为

子类的安全问题

  • 如果在实施子类化时违背了条件通知或单词通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类
  • 对于状态依赖的类,要么将其等待和通知等协议完全向子类公开(并且写入正式文档),要么完全阻止子类参与到等待和通知等过程中
  • 完全禁止子类化

入口协议和出口协议

  • 入口协议:该操作的条件谓词
  • 出口协议:检查被该操作修改的所有状态变量,并确认它们是否使某个其他的条件谓词变为真,如果是,则通知相关的条件队列

显示的Condition对象

内置的条件队列有什么缺陷?

每个内置锁都只能有一个相关联的条件队列,而多个线程可能在同一条件队列上等待不同的条件谓词,调用notifyAll通知的线程非等待同一类型的谓词

Condition与内置条件队列相比的优势:

在每个锁上可存在多个条件队列、条件等待可以是可中断的或不可中断的、基于时限的等待,以及公平的或非公平的队列操作

Condition 是使用 await()、signal() 和 signalAll()

当使用Condition时,一个Condition和一个Lock关联,一个条件队列和一个条件谓词相关,因而可以使用更高效的 signal() 来实现。

Condition 和 内置条件队列的选择?

  • 需要上诉更高级的功能时,使用Condition

  • 如果使用Lock,就决定使用Condition

同步工具类解析

在ReentrantLock和Semaphore这两个接口之间存在许多共同点。

  • 两个类都可以用作一个”阀门“,即每次只允许一定数量的线程通过
  • 线程到达阀门时,可以通过(在调用lock或acquire时成功返回),也可以等待(在调用lock或acquire时阻塞),还可以取消(在调用tryLock或tryAcquire时返回”假“,表示在指定的时间内锁是不可用的或者无法获取许可)
  • 这两个接口都支持中断、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作

原因:都实现了同一个基类AbstractQueuedSynchronizer(AQS)

可以用 ReentrantLock 实现 Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
>   @ThreadSafe
> public class SemaphoreOnLock {
> private final Lock lock = new ReentrantLock(); //
> // CONDITION PREDICATE: permitsAvailable (permits > 0)
> private final Condition permitsAvailable = lock.newCondition();
> @GuardedBy("lock")
> private int permits;
>
> SemaphoreOnLock(int initialPermits) {
> lock.lock();
> try {
> permits = initialPermits;
> } finally {
> lock.unlock();
> }
> }
>
> // BLOCKS-UNTIL: permitsAvailable
> public void acquire() throws InterruptedException {
> lock.lock();
> try {
> while (permits <= 0) permitsAvailable.await();
> --permits;
> } finally {
> lock.unlock();
> }
> }
>
> public void release() {
> lock.lock();
> try {
> ++permits;
> permitsAvailable.signal();
> } finally {
> lock.unlock();
> }
> }
> }
>

AQS

AQS是一个构建锁和同步器的框架

最基本的操作:

  • 获取操作是一种依赖状态的操作,同步器判断当前状态是否允许获得操作,如不允许并且通常会阻塞
  • 释放并不是一个可阻塞的操作时,当执行“释放”操作时,会更新依赖状态,所有在请求时被阻塞的线程都会开始执行

状态管理(一个整数状态):

  • 通过getState,setState以及compareAndSetState等protected类型方法来进行操作
  • 这个整数在不同子类表示任意状态。例:剩余的许可数量,任务状态
  • 子类可以添加额外状态

获取和释放的标准格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
boolean acquire() throws InterruptedException {
while (state does not permit acquire) {//当前状态不允许获取操作
if (blocking acquisition requested) {//需要阻塞获取请求
enqueue current thread if not already queued;//如果当前线程不在队列中 则将其插入队列
block current thread;//则塞当前线程
} else {
return failure;//返回失败
}
possibly update synchronization state;//可能更新同步器的状态
dequeue thread if it was queued;//如果线程位于队列中 则将其移出队列
return success;//返回成功
}
}

void release() {
update synchronization state ;//更新同步器的状态
if (new state may permit a blocked thread to acquire)//新的状态允许某个被则塞的线程获取成功
unblock one or more queued threads ;//解除队列中一个或多个线程的阻塞状态
}

如何使用AQS自定义同步工具?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 获取操作是否允许
* 独占模式:
* true:获取成功,执行后续逻辑
* false:获取失败,往队列中插入独占类型的节点,阻塞线程
*/
protected boolean tryAcquire(long arg) {
throw new UnsupportedOperationException();
}

/**
* 释放操作是否允许
* 独占模式:
* true:唤醒队首线程
* false:失败
*/
protected boolean tryRelease(long arg) {
throw new UnsupportedOperationException();
}

/**
* 获取操作是否允许
* 共享模式:
* >0:获取成功,被唤醒时同时唤醒后续共享类型的节点
* =0:获取成功,跳出执行后续流程
* <0:获取失败,往队列中插入共享类型的节点,阻塞线程
*/
protected long tryAcquireShared(long arg) {
throw new UnsupportedOperationException();
}

/**
* 释放操作是否允许
* 独占模式:
* true:唤醒队首线程,唤醒过程中如果队首发生变化,继续唤醒
* false:失败
*/
protected boolean tryReleaseShared(long arg) {
throw new UnsupportedOperationException();
}

/**
* 判断是否以独占的方式被当前线程持有
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

使用AQS实现一个简单的案例:二元闭锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@ThreadSafe
public class OneShotLatch {
private final Sync sync = new Sync();

public void signal() {
sync.releaseShared(0);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}

private class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int ignored) {
// Succeed if latch is open (state == 1), else fail
return (getState() == 1) ? 1 : -1;
}

protected boolean tryReleaseShared(int ignored) {
setState(1); // Latch is now open
return true; // Other threads may now be able to acquire

}
}
}

同步工具中AQS的使用

ReentrantLock

额外的逻辑:非公平性、可重入、维护当前获取锁的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//非公平锁的实现
protected boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, 1)) {
owner = current;
return true;
}
}
else if (current == owner) {
setState(c+1);
return true;
}
return false;
}

CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected int tryAcquireShared(int acquires) {
while (true) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected boolean tryReleaseShared(int releases) {
while (true) {
int p = getState();
if (compareAndSetState(p, p + releases))
return true;
}
}

FutureTask

  • 在FutureTask中,AQS同步状态被用来保存任务的状态
  • FutureTask还维护一些额外的状态变量,用来保存计算结果或者抛出的异常

ReentrantReadWriteLock

  • 单个AQS子类将同时管理读取加锁和写入加锁
  • ReentrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数
  • 在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法
  • AQS在内部维护了一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问:写操作独占获取;读操作可使第一个写之前的读都获取

进一步了解,可参考ReentrantReadWriteLock读写锁详解

本文标题:12构建自定义的同步工具

文章作者:Sun

发布时间:2020年09月09日 - 19:09

最后更新:2020年09月10日 - 20:09

原始链接:https://sunyi720.github.io/2020/09/09/Java并发编程/12构建自定义的同步工具/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。