构建自定义的同步工具
状态依赖性的类,操作有基于状态的前提条件:
- 如FutureTask,获取一个任务结果前提条件是任务状态为“任务已完成”;
- 如BlockingQueue,从队列中删除元素前提条件是队列状态为“非空”。
- 如ReentrantLock,获取锁成功的前提条件是锁的状态为“可获取”。
创建条件依赖类最简单的方式:利用现有的同步工具类
基于状态的前提条件与先验条件概念上的区别?
先验条件的目的是验证对象的安全性
获取锁之后,根据前提条件判断能否执行后续操作
状态依赖性管理
单线程和多线程中基于状态的前提条件的区别?
单线程中前提条件失败了,可以直接失败退出
多线程中前提条件可能被其它线程变更,下一刻可能就满足了,因而需要条件等待处理
等待前验状态条件的传统方式:
1 | //可阻塞的状态依赖操作的结构 |
现通过一个有界队列的实现来逐步理解条件等待的演变过程:
1 | //有界缓存实现的基类 |
演变一:将前提条件的失败传递给调用者
1 | public class GrumyBoundedBuffer<V> extends BaseBoundedBuffer<V> { |
调用者在使用时,并不应该把不满足前提条件作为一种异常,所以往往需要自行捕获重试,如下:
1 | while (true){ |
演变二:通过轮询与休眠来实现简单的阻塞
1 | public class SleepyBounedBuffer<V> extends BaseBoundedBuffer<V> { |
这样处理的关键时要选择合适的休眠时间,这是关于响应性和CPU效率的权衡,下图是对响应性的影响:

更好的方式:当方法由于某个条件变更时,需要提供某种取消机制,如处理中断。但是如此处理也有不小的复杂性,我们需要一种更为简单的处理方式。
条件队列
名字来源:使线程能通过某种方式等待特定条件变成真。传统队列中是一个个元素,条件队列中是一个个正在等待相关条件的线程
与每个对象都可以作为一个锁一样,每一个对象可以作为一个条件队列:
- Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并且修改对象的状态
- Object.notify/notifyAll通知被挂起的线程可以重新请求资源执行
与“休眠”的有界队列相比,条件队列没有改变原来的语义。只是在CPU效率、上下文切换、响应性上做了优化。
如果一个功能通过前者无法实现,那通过后者也无法实现。
1 | public class BoundedBuffer<V> extends BaseBoundedBuffer<V> { |
正确的使用条件队列
虽然有许多规则确保正确的使用条件队列,但是编译器或者系统平台并没有强制要求遵循这些规则。(这也是尽量基于现有状态依赖性类来构造程序的原因之一)
条件谓词
要想正确的使用条件队列,关键是找出队列在哪个条件上等待。
什么是条件谓词?
条件谓词是使用条件队列的关键,但是在代码上没有什么API规范去正确使用,因而可能会有一些困惑
条件谓词是状态依赖操作的前提条件,如上诉中的“任务已完成”,“队列非空”,“锁可获取”
条件队列、条件谓词、锁之间额关系?
- 条件谓词包含多个状态变量,状态变量需要用锁保护,因而判断条件谓词需先持有锁。
- 当使用条件队列时,调用者必须持有与条件队列相关的锁。
过早唤醒
一个条件队列和多个条件谓词相关。过早唤醒可能发生于:
- 被其他条件谓词的变更线程唤醒
- 唤醒时条件谓词可能为真,但是重新判断时又可能假了
因此,条件谓词的判断和wait操作必须放在循环中,使用的标准模板如下:
1 | void stateDependentMethod() throws InterruptedException |
当使用条件等待时(如Object.wait(), 或Condition.await()):
- 通常都有一个条件谓词–包括一些对象状态的测试,线程在执行前必须首先通过这些测试
- 在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试
- 在一个循环中调用wait
- 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量
- 当调用wait, notify或notifyAll等方法时,一定要持有与条件队列相关的锁
- 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。
通知
当条件谓词变更为ture时,一定要确保以某种方式对挂机的线程发出通知,条件队列的API中通知的方式有两种:
- notify:从等待线程中选择一个唤醒
- notifyAll:唤醒所有等待线程
使用notify而非notifyAll的场景:
- 只有一个条件谓词相关
- 单进单出:每次只唤醒一个
尽管notify的性能更好,但是一定要符合上诉条件;多个条件谓词相互时,使用notify是一个非常危险的操作
如果唤醒了没有释放锁会怎样?
被唤醒的线程必须重新持有所有才能走后续操作,所以还要等待负责唤醒的线程释放锁。
条件通知和通知有啥区别?
如果没有线程被挂起,通知操作也就没有意义。通知前如果进行是否需要通知的判断,也就成为条件通知。
如当队列从“已满”变成“未满”状态,才发起通知
1 | public synchronized void put(V v) throws InterruptedException |
使用通知实现”阀门”:
之前用CountDownLatch实现,只能使用一次
1 | public class ThreadGate { |
子类的安全问题
- 如果在实施子类化时违背了条件通知或单词通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类
- 对于状态依赖的类,要么将其等待和通知等协议完全向子类公开(并且写入正式文档),要么完全阻止子类参与到等待和通知等过程中
- 完全禁止子类化
入口协议和出口协议
- 入口协议:该操作的条件谓词
- 出口协议:检查被该操作修改的所有状态变量,并确认它们是否使某个其他的条件谓词变为真,如果是,则通知相关的条件队列
显示的Condition对象
内置的条件队列有什么缺陷?
每个内置锁都只能有一个相关联的条件队列,而多个线程可能在同一条件队列上等待不同的条件谓词,调用notifyAll通知的线程非等待同一类型的谓词
Condition与内置条件队列相比的优势:
在每个锁上可存在多个条件队列、条件等待可以是可中断的或不可中断的、基于时限的等待,以及公平的或非公平的队列操作
Condition 是使用 await()、signal() 和 signalAll()
当使用Condition时,一个Condition和一个Lock关联,一个条件队列和一个条件谓词相关,因而可以使用更高效的 signal() 来实现。
Condition 和 内置条件队列的选择?
需要上诉更高级的功能时,使用Condition
如果使用Lock,就决定使用Condition
同步工具类解析
在ReentrantLock和Semaphore这两个接口之间存在许多共同点。
- 两个类都可以用作一个”阀门“,即每次只允许一定数量的线程通过
- 线程到达阀门时,可以通过(在调用lock或acquire时成功返回),也可以等待(在调用lock或acquire时阻塞),还可以取消(在调用tryLock或tryAcquire时返回”假“,表示在指定的时间内锁是不可用的或者无法获取许可)
- 这两个接口都支持中断、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作
原因:都实现了同一个基类AbstractQueuedSynchronizer(AQS)
可以用 ReentrantLock 实现 Semaphore
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 >
> public class SemaphoreOnLock {
> private final Lock lock = new ReentrantLock(); //
> // CONDITION PREDICATE: permitsAvailable (permits > 0)
> private final Condition permitsAvailable = lock.newCondition();
> ("lock")
> private int permits;
>
> SemaphoreOnLock(int initialPermits) {
> lock.lock();
> try {
> permits = initialPermits;
> } finally {
> lock.unlock();
> }
> }
>
> // BLOCKS-UNTIL: permitsAvailable
> public void acquire() throws InterruptedException {
> lock.lock();
> try {
> while (permits <= 0) permitsAvailable.await();
> --permits;
> } finally {
> lock.unlock();
> }
> }
>
> public void release() {
> lock.lock();
> try {
> ++permits;
> permitsAvailable.signal();
> } finally {
> lock.unlock();
> }
> }
> }
>
AQS
AQS是一个构建锁和同步器的框架
最基本的操作:
- 获取操作是一种依赖状态的操作,同步器判断当前状态是否允许获得操作,如不允许并且通常会阻塞
- 释放并不是一个可阻塞的操作时,当执行“释放”操作时,会更新依赖状态,所有在请求时被阻塞的线程都会开始执行
状态管理(一个整数状态):
- 通过getState,setState以及compareAndSetState等protected类型方法来进行操作
- 这个整数在不同子类表示任意状态。例:剩余的许可数量,任务状态
- 子类可以添加额外状态
获取和释放的标准格式:
1 | boolean acquire() throws InterruptedException { |
如何使用AQS自定义同步工具?
1 | /** |
使用AQS实现一个简单的案例:二元闭锁
1 |
|
同步工具中AQS的使用
ReentrantLock
额外的逻辑:非公平性、可重入、维护当前获取锁的线程
1 | //非公平锁的实现 |
CountDownLatch
1 | protected int tryAcquireShared(int acquires) { |
FutureTask
- 在FutureTask中,AQS同步状态被用来保存任务的状态
- FutureTask还维护一些额外的状态变量,用来保存计算结果或者抛出的异常
ReentrantReadWriteLock
- 单个AQS子类将同时管理读取加锁和写入加锁
- ReentrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数
- 在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法
- AQS在内部维护了一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问:写操作独占获取;读操作可使第一个写之前的读都获取
进一步了解,可参考ReentrantReadWriteLock读写锁详解