04基础构建模块

基础构建模块

同步容器类

实现方式 : 将他们的状态封装起来,并对每个公有方法都进行同步, 使得每次只有一个线程可以访问.

同步容器类的问题

  1. 客户端在使用同步容器类时,无法保证绝对的线程安全,对复合操作要进行加锁
  2. 线程间串行执行,并发性差

迭代器与 ConcurrentModificationException

并发环境下,客户端使用迭代器进行迭代的过程中,如果其他线程做了修改操作(add,update,remote等),就会报ConcurrentModificationException异常

因此使用迭代器也要加同步

隐藏的迭代器

 toString() , hashCode() , equals() 等很多方法都出触发容器的迭代操作.

并发容器

解决传统同步容器的并发问题

ConcurrentHashMap

  • 加锁策略 : 使用分段锁(粒度更细的加锁机制)代替同步方法块,因此支持多个线程并发的访问 ConcurrentHashMap , 实现更高的吞吐量.
  • ConcurrentHashMap 返回的迭代器具有弱一致性 , 可以(但是不保证)将修改操作立即反映给容器,且迭代过程中不会加锁 , 也不会抛出 ConcurrentModificationException ,
  • 将一些复合操作(如putIfAbsent(), 若没有则添加) 实现为原子操作

CopyOnWriteArrayList

  • 写入时复制,也可理解为修改时复制
  • 返回的迭代器不会抛出 ConcurrentModificationException
  • 使用场景 : 迭代操作远远多于修改操作. 复制数组的操作有一定的开销.
  • 修改操作使用 ReentrantLock 进行加锁

阻塞队列和生产者—消费者模式

  • 提供阻塞的 put 和 take 方法
  • put 方法将阻塞到直到有空间可用 , take 方法将阻塞到直到有元素可用
  • 队列可以有界, 也可以无界
  • 修改容器时统一使用创建队列实例时创建的 ReentrantLock 对象

生产者—消费者模式有什么意义?

  1. 解耦,是的生产和消费成为两个独立的模块,分别开发
  2. 并发,能同时生产和同时消费
  3. 简化工作负载,生产和消费速率不一致的控制

串行线程封闭

使用阻塞队列,可以安全的将线程封闭的对象从生产者交付到消费者。

双端队列和工作密取

什么是工作密取?适用场景?

每个消费者有自己的双端队列 , 当一个消费者完成自己队列的所有任务后 , 那么它可以从其他消费者的双端队列秘密的获取任务 .

适用于自身既是生产者也是消费者的场景:

阻塞方法和中断方法

线程阻塞的原因有哪些?

等待I/O,等待获取锁,Thread.sleep()

阻塞操作与执行时间长的计算操作的区别?

被阻塞的线程必须等待一个不受它控制的事件发生时才能继续执行

中断意味停止?

中断只是标记一个中断状态,需要程序自己去监听判断这个状态

会监听这个状态且抛出 InterruptedException 异常的方法,称为中断方法,如sleep(),wait(),join()等

常抛出后中断状态会被置为ture

由此理解下面的中断相关的方法:

  • interupt():标记中断状态

  • isinterrupted():是否中断

  • interrupted():是否中断,且重置为false

被中断后如何处理?

  1. 将 InterruptedException 传递给方法的调用者
  2. 捕获这个异常,并恢复中断状态为true

同步工具

同步工具类有什么作用?

根据自身状态(某变量)协调线程的控制流

闭锁

有什么用?应用场景?

延迟线程的执行进度,知道达到终止状态(如CountDownLatch 计数为 0 )

使用场景:

  1. 某个操作在其他所有资源初始化后执行
  2. 某个操作在其他所有服务启动后执行
  3. 某个操作在其他参与者准备好后执行

CountDownLatch 使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.pinnet.test;

import java.util.concurrent.CountDownLatch;

public class CountLatchTest {

public void timeTask(int threadNumbers, Runnable task) throws InterruptedException {
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(threadNumbers);

for (int i = 0; i < threadNumbers; i++) {
new Thread() {
public void run() {
try {
// 所有线程在起始门等待
start.await();
// 执行任务
task.run();
// 结束门递减
end.countDown();
} catch (InterruptedException e) {

}
}
}.start();
}
// 所有工作线程开始执行
start.countDown();
// 所有工作线程启动后主线程立即登待
end.await();
System.out.println("开始主线程");

}

}

FutureTask

有什么用?

依赖 Callable ,可以获取执行状态(未执行,执行中,完成),并最终获取结果,或者取消执行

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class FutureTaskTest {
// 创建任务
private final FutureTask<Integer> future = new FutureTask<>(new Callable<Integer>() {
public Integer call() {
return 123;
}
});
// 创建线程
private final Thread thread = new Thread(future);
// 对外提供方法启动线程
public void start() {
thread.start();
}
// 获取计算结果
public Integer get() {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return null;
}
}
}

更多FutureTask可参考FutureTask

信号量

有什么用?

限制同时执行某个操作的数量

使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class BoundedHashSet<T>{
private final Set<T> set;
private final Semaphore semaphore;

public BoundedHashSet(int bound) {
set = Collections.synchronizedSet(new HashSet<T>());
semaphore = new Semaphore(bound);
}

public boolean add(T t) throws InterruptedException{
semaphore.acquire();
boolean wasAdded = set.add(t);
if (!wasAdded) {
semaphore.release();
}
return wasAdded;
}

public boolean remove(T t){
boolean wasRemoved = set.remove(t);
if (wasRemoved) {
semaphore.release();
}
return wasRemoved;
}

}

栅栏

有什么用?

所用线程同时执行某个操作

栅栏和闭锁的区别?

  • 闭锁是一次性对象 , 一旦进入终止状态 , 就不能被重置
  • 栅栏是所有线程必须同时到达栅栏位置 , 才能继续执行.
  • 闭锁用于等待事件 , 栅栏用于等待线程.

使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void barrier() {
int number = 6;
// 参数表示屏障拦截的线程数量
// barrier.await() 调用 number 次后所有线程的阻塞状态解除
CyclicBarrier barrier = new CyclicBarrier(number);

for (int i = 0; i < number; i++) {
new Thread(new Runnable() {

@Override
public void run() {
System.out.println("此线程任务已经完成");
try {
// 调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
barrier.await();
System.out.println("所有线程执行完成");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
}
}

构建高效且可伸缩的结果缓存

场景描述:使用 HashMap 并且使用同步块保证线程安全。

问题一:使用同步块串行执行,性能差(甚至不如没有缓存)。

image-20200810203239351

优化一:使用 ConcurrentHashMap 代替同步块,实现并发执行

问题二:客户端中复合操作(先查询后修改)未加锁,可能会导致同一操作(缓存相同)执行多次(若考虑加锁,则等同于问题一)

image-20200810203253004

优化二:需要判断当前计算是否正在执行中,以避免重复执行,可以使用Futrue<>作为缓存(采用先缓存后计算的方式),使得后续操作能够判断出当前缓存正在被计算,等待其完成后返回。

问题三:由于复合操作(先查询后修改)还是未加锁,还是会出现重复计算的问题(只是概率更低)

image-20200810203800211

优化三:使用CAC操作(乐观锁),调用 ConcurrentHashMap.putIfAbscent

后续问题:缓存污染(缓存了执行错误的Futrue)、缓存清理(设置时效)、缓存替换策略(避免缓存过大)

本文标题:04基础构建模块

文章作者:Sun

发布时间:2020年08月10日 - 20:08

最后更新:2020年08月20日 - 10:08

原始链接:https://sunyi720.github.io/2020/08/10/Java并发编程/04基础构建模块/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。