21 并发

并发

并发的优势是?

  1. 执行速度极大提高
  2. 为设计类型的程序提供更加易用的模型

并发的问题

当并发执行的任务开始产生交互的时候,一些不可预料的问题就会发生。因此,伴随着并发好处的同时会有一大堆的问题产生。

并发的多面性

什么叫做多面性?

需要并发处理的情况有很多,实现并发的方式也有很多,但是他们并不是一一对应的关系,也就是说,没有一个确切的解决方案,需要随机应变。
并发解决的问题大致可以分为两类:“速度”和“设计可管理性”

速度

如何解决了速度的问题?

对于多处理器,把任务分段在多个处理器上同时执行,很明显是变快了,这很好理解。
但是,并发更多的是处理单处理器上的速度问题,这似乎有点违背常理:在单处理器上,并发处理需要上下文切换,似乎开销比顺序执行的还要大。
让并发存在单处理上变得不可缺少的一个原因是“阻塞”。

什么是阻塞?

当程序因为某个控制范围之外(通常是I/O)的条件而停止了,整个程序都要暂停,也就是阻塞。
如果有并发,我们可以说只是一个任务暂停了,其它的任务还可以执行。

如何处理阻塞问题?

  • 无并发的处理方案:在代码中利用循环周期性的检查阻塞的状态。问题是:

    1. 代码会非常丑陋
    2. 无法确保程序员不忘记这种检查
  • 使用进程实现并发:进程之间的任务互不干涉,若某个进程被阻塞了,执行下一个任务,这样的并发程序是没有风险的。

进程实现并发的问题?

进程有数量和开销的限制。

其它的并发实现?

  • 使用并发任务彼此隔离的语言(譬如Erlang):这类语言和进程并发类似,而且减少了进程的限制。
  • 在顺序性语言(JAVA)上提供了对线程的支持

改进代码设计

如何解决了代码设计管理的问题?

比如仿真设计,在游戏中,每一个角色看起来都是独立的,互相执行的不同的任务,这就需要多线程的设计方式了,使用单线程很难处理这个问题。

JAVA 线程基本使用

  1. 先定义任务,再把任务赋予到一个线程上驱动,案例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    class LiftOff implements Runnable {
    protected int countDown = 10;
    private static int taskCount = 0;
    private final int id = taskCount++;

    public LiftOff() {
    }

    public LiftOff(int countDown) {
    this.countDown = countDown;
    }

    public String status() {
    return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), ";
    }

    @Override
    public void run() {
    while (countDown-- > 0) {
    System.out.print(status());
    Thread.yield();
    }
    }
    }
    public class ThreadTest {
    public static void main(String[] args) {
    for (int i = 0; i < 5; i++) {
    new Thread(new LiftOff()).start();
    }
    System.out.println("Waiting for LiftOff");
    }
    }

每一次执行的结果都不一样。
Thread.yield()是让线程调度器重新分配任务。

  1. 继承Thread,定义一个具有特定任务的线程。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class SimpleThread extends Thread {
    private int countDown = 5;
    private static int threadCount = 0; 814
    Thinking in
    JavaBruce Eckel

    public SimpleThread() { // Store the thread name
    super(Integer.toString(++threadCount));
    start();
    }

    public String toString() {
    return "#" + getName() + "(" + countDown + "), ";
    }

    public void run() {
    while (true) {
    System.out.print(this);
    if (--countDown == 0) return;
    }
    }
    }

在构造器中启动线程并不是一个很好的选择。

需要理解的一个思想是:

Runnable存在的意义更多的是一个“任务拥有者”的抽象,里面的run()表示一个“可执行的任务”
Thread存在的意义才贴近系统上的线程,可以看做是一个“任务执行者”,具有开始执行任务的方法start()。
Thread实现Runnable表示Thread也是“任务拥有者”,Thread(Runnable)的含义是用一个新的任务去覆盖Thread本身的默认任务。
Thread中持有target来保存任务拥有者,如果任务拥有者不为空,会调用任务拥有者的方法。
把任务和线程的概念抽离出来,可以更好的处理任务的创建,和对线程运行的理解。

上面的实现方法有什么问题?

  1. 每一个任务都需要创建一个线程,在物理上创建线程的代价是比较大的,因此我们必须管理好线程。
  2. 没有返回

如何管理线程?

使用JAVA5的Executor
Executor的实现使用了命令模式,把所有的Runable作为一个任务,采用不同的策略来分配线程执行。
也使用了享元模式,用来操作线程的缓存。

通过查看Thread中的类,可以发现,Thread并没有修改target的方法,那么,线程如何重复利用?

实际上,虽然任务拥有者无法被改变,但是任务拥有者拥有的任务是可以被改变的。Executor中的Worker类就是一类可以把其它Runnable中的任务,接手过来,以此实现线程的重复利用的。

什么是命令模式?

  • Command:定义命令的接口,声明执行的方法。
  • ConcreteCommand:命令接口实现对象,是“虚”的实现;通常会持有接收者,并调用接收者的功能来完成命令要执行的操作。
  • Receiver:接收者,真正执行命令的对象。任何类都可能成为一个接收者,只要它能够实现命令要求实现的相应功能。
  • Invoker:要求命令对象执行请求,通常会持有命令对象,可以持有很多的命令对象。这个是客户端真正触发命令并要求命令执行相应操作的地方,也就是说相当于使用命令对象的入口。
  • Client:创建具体的命令对象,并且设置命令对象的接收者。注意这个不是我们常规意义上的客户端,而是在组装命令对象和接收者,或许,把这个Client称为装配者会更好理解,因为真正使用命令的客户端是从Invoker来触发执行。

下面的链接有详细的介绍:
[命令模式]https://my.oschina.net/xianggao/blog/618809

我的理解是,把请求抽象成命令,然后可以管理这些命令:

  1. 可以把各种请求抽象成命令对象来处理。
  2. 可撤销,让所有的命令都支持可撤销的方法。
  3. 宏命令,一个命令的集合,批量执行。
  4. 命令队列,采取多线程的方式来执行命令。

Executor的命令模式?

Runable类就是命令的抽象
Executor是Invoker,执行对命令的管理和执行。

常见的3种Executor子类有:

  1. CachedThreadPool:这个实例会根据需要,在线程可用时,重用之前构造好的池中线程。如果不存在可用线程,那么会重新创建一个新的线程并将其加入到线程池中。如果线程超过60s(可设置)还未被使用,就会被中止并从缓存中移除。因此,线程池在长时间空闲后不会消耗任何资源。
  2. FixedThreadPool:这个实例会复用固定数量的线程处理一个共享的无边界队列。有多的任务被提交过来,那么它会一致在队列中等待直到有线程可用。如果任何线程在执行过程中因为错误而中止,新的线程会替代它的位置来执行后续的任务。所有线程都会一致存于线程池中,直到显式的执行ExecutorService.shutdown() 关闭。
  3. 这个实例只会使用单个工作线程来执行一个无边界的队列。它可以保证认为是按顺序执行的,任何时候都不会有多于一个的任务处于活动状态。

如何解决返回值的问题?

实现带有泛型的Callable接口而不是Runnable接口,实现call()方法。
配合Exceutor使用的话,必须使用ExecutorService.submit()方法调用。

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class TaskWithResult implements Callable<String> {
private int id;

public TaskWithResult(int id) {
this.id = id;
}

public String call() {
return "result of TaskWithResult " + id;
}
}

public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results = new ArrayList<Future<String>>();
for (int i = 0; i < 10; i++) results.add(exec.submit(new TaskWithResult(i)));
for (Future<String> fs : results)
try { // get() blocks until completion:
System.out.println(fs.get());
} catch (InterruptedException e) {
System.out.println(e);
return;
} catch (ExecutionException e) {
System.out.println(e);
} finally {
exec.shutdown();
}
}
}

因为我们不知道线程在什么时候调用完成,所以返回值被Future封装了,使用isDone()查询Future是否已经完成,完成后可以使用get()方法获取返回值。

之前我们可以看到,线程的执行完全是随机的,和底层的实现机制有关,如何控制这一执行的顺序?

如何控制线程执行的顺序?

设置线程优先级,Thread.currentThread().setPriority(priority);
使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SimplePriorities implements Runnable {
private int countDown = 5;
private volatile double d; // No optimization private int priority;

public SimplePriorities(int priority) {
this.priority = priority;
}

public String toString() {
return Thread.currentThread() + ": " + countDown;
}

public void run() {
Thread.currentThread().setPriority(priority);
while (true) { // An expensive, interruptable operation:
for (int i = 1; i < 100000; i++) {
d += (Math.PI + Math.E) / (double) i;
if (i % 1000 == 0) Thread.yield();
}
System.out.println(this);
if (--countDown == 0) return;
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
exec.execute(new SimplePriorities(Thread.MIN_PRIORITY));
exec.execute(new SimplePriorities(Thread.MAX_PRIORITY));
exec.shutdown();
}
}

注意,优先级的设置最好放到run()方法中,因此可以保证当前任务已经开始执行。

优先级的作用:中断开销比较大的低优先级线程,让高优先级线程先执行。并不是说高优先级的线程就一定先执行。

后台线程

什么是后台线程?

指在程序运行的时候再后台提供一种通用的服务,并且这种线程并不属于程序中不可或缺的部分,一旦非后台线程结束了,所有的后台程序也就结束了。

使用方法:

1
2
3
Thread daemon = new Thread(new SimpleDaemons());       
daemon.setDaemon(true); // Must call before start()
daemon.start();

线程启动前设置
后台线程中创建的线程都是后台线程。
后台线程中的finally{}语句有可能不会被执行。

对于Executor管理的线程,如何设置?

每一个静态的ExecutorService都可以接受一个ThreadFactory的工厂

1
2
3
4
5
6
7
public class DaemonThreadFactory implements ThreadFactory {   
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}

对于线程的基本介绍到这,现在来解决第一个问题:如何解决阻塞问题?

先看一个阻塞问题:

1
2
3
4
5
6
7
8
class UnresponsiveUI {   
private volatile double d = 1;
public UnresponsiveUI() throws Exception {
while(d > 0)
d = d + (Math.PI + Math.E) / d;
System.in.read(); // Never gets here
}
}

使用多线程解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ResponsiveUI extends Thread {
private static volatile Long d = 1L;

public ResponsiveUI() {
setDaemon(true);
start();
}

@Override
public void run() {
while (true) {
d++;
}
}

public static void main(String[] args) throws Exception {
//! new UnresponsiveUI(); // Must kill this process
new ResponsiveUI();
System.in.read();
// Shows progress
System.out.println(d);
}
}

线程的异常

每一个线程的异常都需要在run()方法中处理,一旦异常被抛出到线程外,都会打印到控制台。
案例:

1
2
3
4
5
6
7
8
9
public class ExceptionThread implements Runnable {   
public void run() {
throw new RuntimeException();
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
}
}

结果是控制台打印出异常,即便是runtime异常,即便在main()中加上try-catch语句也没用。
可是有时候如果不希望某个线程的异常影响到其它线程的执行,我也不想在run()做一些没必要的异常处理,怎么办?

JAVA5增加了Thread.UncaughtExceptionHandler接口,可以使用Thread.setUncaughtExceptionHandler()的来设置对特定线程死亡时未处理的异常的处理。
也可以使用setDefaultUncaughtExceptionHandler()来设置默认的处理器。

1
2
3
4
5
6
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {   
public void uncaughtException(Thread t, Throwable e)
{
System.out.println("caught " + e);
}
}

配合Exceutor使用的话就是在ThreadFactory的实现中调用Thread.setUncaughtExceptionHandler()方法。

共享受限的资源

线程共享资源的问题?

一个线程访问到了另一个线程中尚未处理完成的中间变量结果,导致逻辑上出现错误。
主导原因是由于 两个线程的代码发生交叉执行。
访问到中间状态值才会出现错误,访问到结果状态值不算错误。

如何解决

思路是防止访问到其它线程中共享资源中间状态值
使用同步机制:

  1. 互斥机制:synchronized,lock
  2. 原子操作

字段设为private的好处是?

只允许在方法中去访问字段,然后给方法加锁来控制对资源的访问。

方法前加synchronized是给什么加锁?

是给当前的对象或者是类,这点也就意味着一个对象中有多个synchronized方法被调用,即便不是同一个方法,只一个方法调用结束了,另一个方法才能被调用。
被锁上的“对象”都会有一个对应的计数器,只有当锁的计数器为0的时候,某资源才会被访问。譬如上述中一个对象中有3个synchronized方法被调用,当前的计数器就是3。

上面的锁机制称为互斥机制:一段时间只有一个任务可以运行这段代码。

使用synchronized和使用lock的区别?

  1. lock具有更细粒度,可以处理关键处的代码,没有必要锁上整个代码。
  2. lock可以处理异常,synchronized报错了只能返回错误
  3. lock具有更多的锁控制方法,比如tryLock()可以设置获取锁和等待锁的时间。

什么时候需要进行同步处理?

当一个线程需要读取其它线程中修改的变量的时候。所有读取改变量的方法都要加同步处理。

这里必须强调所有,一些加一些不加,也无法产生正常的结果。

什么是原子操作和原子性?

一个不可被线程调度中断的操作被称为原子操作。这种不可分割的特性叫做原子性。
理论上说,如果一个线程只执行一个原子操作,就不会有不稳定的中间态,是不是就没有资源共享的问题了?

理解上是正确的,但是使用起来非常危险,会有以下的几个问题:

  1. 如何确实是一个原子操作?
  2. 多核处理器的情况下还会有可见性的问题。
  3. 使用synchronized保证的原子性只对synchronized方法有效

    使用synchronized可以保证原子性

什么属于原子操作?

基本类型都具有原子性(除long和double之外),对其的读取和赋值操作,返回操作是原子操作。
对于long和double(64字节),jvm可能会分为两个32字节的指令来处理,这时候也就不是原子操作了。
队员这种基本数据类型的简单操作,可以使用volatile来保证原子性。

什么是可见性问题?

在多核处理器中,每一个处理器刻都会有一个本地缓存,导致数据没有写入到内存中,也就无法被其它线程读取,这就是可见性的问题。
可以使用volatile来保证字段的可见性。

使用synchronized可以保证可见性

如何理解使用synchronized保证的原子性只对synchronized方法有效?

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  public class AtomicityTest implements Runnable {
private int i = 0;
public int getValue() {
return i;
}
private synchronized void evenIncrement() {
i++;
i++;
}
public void run() {
while (true) evenIncrement();
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
AtomicityTest at = new AtomicityTest();
exec.execute(at);
while (true) {
int val = at.getValue();
if (val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}

上面的结果是会出现奇数值,原因是因为synchronized只保证和synchronized操作之间的原子性

理论上说,使用原子操作确实可以代替锁来实现同步,但是由于各种各样的原因,使用原子操作是一个非常危险的尝试,如果不是专家,请不要轻易尝试。
如果情非得已需要使用原子操作,JAVA5提供了原子类来帮助大家避免危险。

原子类

AtomicInteger,AtomicLong,AtomicReference等atomic包下的类来提供原子操作。

比如Random中就使用到了原子类来保证线程安全性:

1
2
3
4
5
6
7
8
9
protected int next(int bits) {
long oldseed, nextseed;
AtomicLong seed = this.seed;
do {
oldseed = seed.get();
nextseed = (oldseed * multiplier + addend) & mask;
} while (!seed.compareAndSet(oldseed, nextseed));
return (int)(nextseed >>> (48 - bits));
}

也由此可知,Random并不是真正的随机,而是根据初始参数为种子(如果未传以当前的时间为种子)来逐个生成数值:

1
2
3
4
5
6
7
8
public class RanomTest {
public static void main(String arg[]) {
Random random = new Random(47);
for (int i = 1; i <= 10; i++) {
System.out.println(random.nextInt(100));
}
}
}

无论运行多少次,结果都是一样的。因此,Random在多线程中最好作为static方法来使用,因此它是线程安全的。

同步代码块

synchronized(互斥量){},相比synchronized方法在性能上快很多,但还是没有lock功能强大。

线程本地存储

除了使用同步机制以外,还有第二种方法可以解决共享资源产生冲突的问题,那就是根除对变量的共享。

如何使用线程本地存储?

使用ThreadLocal类

中断

单线程的阻塞可以用多线程来解决,但是多线程的情况下也会出现阻塞,比如等待另一个线程的资源的释放。

线程都有哪些状态?

  1. 新建:新建的线程所处的短暂的一个状态,等待调度器把状态转变成就绪或者阻塞。
  2. 就绪:有时间片就可以执行
  3. 阻塞:某个条件阻止线程执行
  4. 死亡:线程停止执行,不会被调度器调度。

导致线程进入阻塞状态的原因?

  1. sleep()
  2. wait()
  3. 等待I/O
  4. 无法获取锁

有时间不希望线程无休止的阻塞下去,怎么中断?

使用interrupt()
应该注意的是,线程是否中断应该由自己来决定,interrupt()只是“给一个建议”,某些情况下,强制中断会带来很严重的后果。
因此并不是所有的阻塞状态都能被中断的,比如IO阻塞(这里指传统的IO)和锁阻塞(Lock类具有可中断的锁ReentrantLock)就属于不可中断的阻塞状态。

线程如何响应中断?

interrupt()是设置线程的一个中断信号,有两种响应的结果。

  1. 处于可中断的阻塞状态的线程接收到中断信号后会抛出InterruptedException异常,随后重置中断信号。
  2. 对于没有阻塞状态的线程不会响应这个中断信号,但是可以使用interrupted()来检查中断信号,这个方法会返回当前的中断信号,并且随后重置中断信号,因此也可以通过判断interrupted来。

中断发生在阻塞前会是什么结果?

线程协作

为什么wait()的持有者是Object而不是Thread?

我的理解是:并不是由线程来决定是否能拥有锁,而应该是由锁来决定哪一个线程可以获取到它,因此,是锁决定挂起或者唤醒一个线程,而不是由线程决定自己是否被挂起或者挂起,所以持有wait()和notify()的对象应该是锁,而所有的对象都可以是锁,因而ait()的持有者是Object,而不是Thread。
需要注意的是wait(),notify()等的使用需要在同步代码中,否则会抛出异常。

如何控制wait()发生在notify()之前?

把wait()放在某条件语句(线程A中)中,让wait()发生, 在notify()前后(线程B)中控制wait()方法被唤醒的条件。

为什么wait()要放在while中?

因为被唤醒不等于立刻就获得了锁开始执行,有可能另一被唤醒的线程获得了锁,并且再次改变(达到本线程中)了被挂起的条件,这时候如果本线程还要继续执行,就会出现问题。

Lock的线程协作方式使用

每一个Lock实例都可以使用newCondition()来获取Condition对象,这个Condition对象具有await(),signal()和signalAll方法来控制线程协作。

生产者和消费者

生产者和消费者问题是一个典型的线程协作和案例:
生产者是生产资源的任务拥有者。
消费者是消费资源的任务拥有者。

阻塞队列

当消费和生产的资源上限只有一个的时候,性能上会受到很大的局限。
concurrent包下面的BlockingQuene帮我们创建了一个强大而使用简单的资源队列的使用:

  1. BlockingQuene中的入队和出对操作都会加锁,所以线程的其它的地方不加锁也不会出现资源共享的问题。
  2. 当BlockingQuene为空的时候,会挂起消费者。当BlockingQuene满的时候会挂起生产者。

线程间的输入和输出

可以使用管道流实现,一个进程写入,另一个进程读出,没有涉及到资源共享的问题。
实际上,管道流是BlockingQuene出现之前的一种线程间安全交流的代替方法。

死锁

所有的线程都在等待某个条件并且一直等待下去,就是死锁,死锁往往是出乎意料的发生的。

经典的死锁案例——哲学家就餐问题:
基本描述指定了五位哲学家.这些哲学家把他们的一部分时间花在思考上,把一部分时间花在吃饭上。当他们思考的时候,他们不需要任何共享的资源,但是他们使用有限数量的器皿吃饭。在最初的问题描述中,餐具是叉子,需要两把叉子才能从桌子中间的碗中取出意大利面,但说餐具是筷子似乎更有意义。显然,每个哲学家都需要两把筷子才能吃饭。
作为哲学家,他们的钱很少,所以他们只能买得起五只筷子(更普遍地说,筷子的数量与哲学家的数量相同)。它们之间隔着桌子。当一个哲学家想吃东西时,那个哲学家必须拿起左手的筷子和右边的筷子。如果两边的哲学家都在用想要的筷子,我们的哲学家必须等到必要的筷子可用为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class Chopstick {
private boolean taken = false;
public synchronized void take() throws InterruptedException {
while (taken) wait();
taken = true;
}
public synchronized void drop() {
taken = false;
notifyAll();
}
}

public class Philosopher implements Runnable {
private Chopstick left;
private Chopstick right;
private final int id;
private final int ponderFactor;
private Random rand = new Random(47);

private void pause() throws InterruptedException {
if (ponderFactor == 0) return;
TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor * 250));
}

public Philosopher(Chopstick left, Chopstick right, int ident, int ponder) {
this.left = left;
this.right = right;
id = ident;
ponderFactor = ponder;
}

public void run() {
try {
while (!Thread.interrupted()) {
print(this + " " + "thinking");
pause(); // Philosopher becomes hungry
print(this + " " + "grabbing right");
right.take();
print(this + " " + "grabbing left");
left.take();
print(this + " " + "eating");
pause();
right.drop();
left.drop();
}
} catch (InterruptedException e) {
print(this + " " + "exiting via interrupt");
}
}

public String toString() {
return "Philosopher " + id;
}
}

public class DeadlockingDiningPhilosophers {
public static void main(String[] args) throws Exception {
int ponder = 5;
if (args.length > 0) ponder = Integer.parseInt(args[0]);
int size = 5;
if (args.length > 1) size = Integer.parseInt(args[1]);
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks = new Chopstick[size];
for (int i = 0; i < size; i++) sticks[i] = new Chopstick();
for (int i = 0; i < size; i++) exec.execute(new Philosopher(sticks[i], sticks[(i + 1) % size], i, ponder));
if (args.length == 3 && args[2].equals("timeout")) TimeUnit.SECONDS.sleep(5);
else {
System.out.println("Press ‘Enter’ to quit");
System.in.read();
}
exec.shutdownNow();
}
}

死锁的情况是:每一个哲学家都持有一双筷子,等待获取另一双筷子。

死锁发生的条件是什么?

  1. 有互斥的条件,就是存在只能被一个线程使用的资源。如一个筷子只能被一个哲学家使用。
  2. 至少有一个任务持有一个资源且等待一个被别的任务持有的资源。如哲学家持有一根筷子,等待另一个哲学家手中的筷子。
  3. 线程不能抢占资源。
  4. 必须有循环等待。如哲学家永远都是试图先去获取右手的筷子,之后再获取左手的筷子。

如何防止死锁?

破坏上面的任何一个条件即可。最容易的方法是破坏第四个,比如让其中一个哲学家颠倒一下获取筷子的顺序。

破坏其它条件的解决方案请参考更高级的讨论线程的书籍

concurrent库下的新功能

CountDownLatch

用来同步一个或者多个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
CountDownLatch(int count)
构造一个用给定计数初始化的 CountDownLatch。

// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
void await()
// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
boolean await(long timeout, TimeUnit unit)
// 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
void countDown()
// 返回当前计数。
long getCount()
// 返回标识此锁存器及其状态的字符串。
String toString()

CyclicBarrier

让多个线程在同一起跑线等待,然后分回合的并行执行。
下面是一个赛马的比赛,每一回合马前进1到3步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;

public Horse(CyclicBarrier b) {
barrier = b;
}

public synchronized int getStrides() {
return strides;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
// Produces 0, 1 or 2
strides += rand.nextInt(3);;
}
barrier.await();
}
} catch (InterruptedException e) {
// A legitimate way to exit
} catch (BrokenBarrierException e) {
// This one we want to know about
throw new RuntimeException(e);
}
}

@Override
public String toString() {
return "Horse " + id + " ";
}

public String tracks() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < getStrides(); i++) {
s.append("*");
}
s.append(id);
return s.toString();
}
}

class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec = Executors.newCachedThreadPool();
private CyclicBarrier barrier;

public HorseRace(int nHorses, final int pause) {
barrier = new CyclicBarrier(nHorses, new Runnable() {
@Override
public void run() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < FINISH_LINE; i++) {
s.append("=");
}
// The fence on the racetrack
System.out.println(s);
for (Horse horse : horses) {
System.out.println(horse.tracks());
}
for (Horse horse : horses) {
if (horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + "won!");
exec.shutdownNow();
return;
}
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (InterruptedException e) {
System.out.println("barrier-action sleep interrupted");
}
}
});
for (int i = 0; i < nHorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}

public static void main(String[] args) {
int nHorses = 7;
int pause = 10;
new HorseRace(nHorses, pause);
}
}

CyclicBarrier的构造器接受两个变量,第一个指定计数器,第二个指定一个栅栏任务。
当await()的线程到达指定的数量的时候,执行栅栏任务。

DelayQuene

BlockingQueue的一个变种,任务以“延迟”来排序,优先执行即将到期的任务。
队列中的资源对象需要实现Delayed接口,实现getDelay()方法。
Delayed又继承了Complarable接口,因此又需要实现compareTo()方法。
案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence = new ArrayList<DelayedTask>();

public DelayedTask(int delayInMilliseconds) {
delta = delayInMilliseconds;
trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
}

@Override
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask) arg;
if (trigger < that.trigger) {
return -1;
}
if (trigger > that.trigger) {
return 1;
}
return 0;
}

@Override
public void run() {
System.out.print(this + " ");
}

@Override
public String toString() {
return String.format("[%1$-4d]", delta) + " Task " + id;
}

public String summary() {
return "(" + id + ":" + delta + ")";
}

public static class EndSentinel extends DelayedTask {
private ExecutorService exec;

public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}

@Override
public void run() {
for (DelayedTask pt : sequence) {
System.out.println(pt.summary() + " ");
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}

class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;

public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
q.take().run();
}
// Run task with the current thread
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished DelayedTaskConsumer");
}
}

class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
// Fill with tasks that have random delays:
for (int i = 0; i < 20; i++) {
TimeUnit.SECONDS.sleep(1L);
queue.put(new DelayedTask(rand.nextInt(5000)));
}
// Set the stopping point
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}

上面EndSentinel是一个终结任务,作清理工作。
TimeUtil下面的枚举类有一个convert()方法,可以很方便的做时间单位上的转换。
如果队列为空了,那么将会返回null。

PriaorityBlockingQuene

自定义优先级的BlockingQuene,需要队列中的资源实现Comparable接口。

ScheduledExecutor

定时和周期性的执行线程。

1
2
3
4
5
6
7
8
9
10
11
// 使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)

// 创建并执行在给定延迟后启用的一次性操作。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

// 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

// 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

Semaphore

锁在任何时刻只允许一个任务访问一个资源,而信号量允许N个任务同时访问这个资源。比如“线程池”的实例。
使用方法:

1
2
3
4
5
6
7
8
//指定信号量,默认非公平
public Semaphore(int permits)
//指定信号量和是否公平
public Semaphore(int permits, boolean fair)
//获取信号量,获取不到则阻塞
public void acquire()
//释放信号量
public void release()

Exchanger

交换两个线程的对象数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class Car extends Thread {
private Exchanger<String> exchanger;

public Car(Exchanger<String> exchanger) {
super();
this.exchanger = exchanger;
}

@Override
public void run() {
try {
System.out.println(1 + ": " + exchanger.exchange("Car"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Bike extends Thread {
private Exchanger<String> exchanger;

public Bike(Exchanger<String> exchanger) {
super();
this.exchanger = exchanger;
}

@Override
public void run() {
try {
System.out.println(2 + ": " + exchanger.exchange("Bike"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Car car = new Car(exchanger);
Bike bike = new Bike(exchanger);
car.start();
bike.start();
System.out.println("Main end!");
}
}

仿真

  1. 类似:生活中适当调整工作人员数量来服务随机到来的人群。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class Customer {
private final int serviceTime;

public Customer(int tm) {
serviceTime = tm;
}

public int getServiceTime() {
return serviceTime;
}

@Override
public String toString() {
return "[" + serviceTime + "]";
}
}

class CustomerLine extends ArrayBlockingQueue<Customer> {
public CustomerLine(int maxLineSize) {
super(maxLineSize);
}

@Override
public String toString() {
if (this.size() == 0) {
return "[Empty]";
}
StringBuilder result = new StringBuilder();
for (Customer customer : this) {
result.append(customer);
}
return result.toString();
}
}

/**
* 消费者生成器, 随机的生成到来的人,放在CustomerLine中
*/
class CustomerGenerator implements Runnable {
private CustomerLine customers;
private static Random rand = new Random(47);

public CustomerGenerator(CustomerLine cq) {
customers = cq;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(300));
customers.put(new Customer(rand.nextInt(1000)));
}
} catch (InterruptedException e) {
System.out.println("CustomerGenerator interrupted");
}
System.out.println("CustomerGenerator terminating");
}
}

class Teller implements Runnable, Comparable<Teller> {
private CustomerLine customers;
private static int counter = 0;
private final int id = counter++;

private int customersServed = 0;
private boolean servingCustomerLine = true;

public Teller(CustomerLine cq) {
customers = cq;
}

/**
* 开始服务
*/
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Customer customer = customers.take();
TimeUnit.MILLISECONDS.sleep(customer.getServiceTime());
synchronized (this) {
customersServed++;
while (!servingCustomerLine) {
wait();
}
}
}
} catch (InterruptedException e) {
System.out.println(this + "interrupted");
}
System.out.println(this + "terminating");
}

/**
* 让柜台人员去做其他事,停止服务
*/
public synchronized void doSomethingElse() {
customersServed = 0;
servingCustomerLine = false;
}

/**
* 让柜台人员开始服务
*/
public synchronized void serveCustomerLine() {
assert !servingCustomerLine : "already serving: " + this;
servingCustomerLine = true;
notifyAll();
}

@Override
public String toString() {
return "Teller " + id + " ";
}

public String shortString() {
return "T" + id;
}

@Override
public synchronized int compareTo(Teller other) {
return customersServed < other.customersServed ? -1 : (customersServed == other.customersServed ? 0 : 1);
}
}

class TellerManager implements Runnable {
private ExecutorService exec;
private CustomerLine customers;
private PriorityQueue<Teller> workingTellers = new PriorityQueue<Teller>();
private Queue<Teller> tellersDoingOtherThings = new LinkedList<Teller>();
private int adjustmentPeriod;
private static Random rand = new Random(47);

public TellerManager(ExecutorService e, CustomerLine customers, int adjustmentPeriod) {
exec = e;
this.customers = customers;
this.adjustmentPeriod = adjustmentPeriod;
// Start with a single teller:
Teller teller = new Teller(customers);
exec.execute(teller);
workingTellers.add(teller);
}

/**
* 调整柜台人员的数量
*/
public void adjustTellerNumber() {
// This is actually a control system. By adjusting
// the numbers, you can reveal stability issues in
// the control mechanism.
// If line is too long, add another teller:
if (customers.size() / workingTellers.size() > 2) {
// another job, bring one back:
if (tellersDoingOtherThings.size() > 0) {
Teller teller = tellersDoingOtherThings.remove();
teller.serveCustomerLine();
workingTellers.offer(teller);
return;
}
Teller teller = new Teller(customers);
exec.execute(teller);
workingTellers.add(teller);
return;
}
if (workingTellers.size() > 1 && customers.size() / workingTellers.size() < 2) {
reassignOneTeller();
}
// If there is no line, we only need one teller:
if (customers.size() == 0) {
while (workingTellers.size() > 1) {
reassignOneTeller();
}
}
}

/**
* 让已服务人数最少的柜台人员休息
*/
private void reassignOneTeller() {
Teller teller = workingTellers.poll();
teller.doSomethingElse();
tellersDoingOtherThings.offer(teller);
}

/**
* 周期性的打印当前服务信息
*/
@Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(adjustmentPeriod);
adjustTellerNumber();
System.out.print(customers + " { ");
for (Teller teller : workingTellers) {
System.out.print(teller.shortString() + " ");
}
System.out.println("}");
}
} catch (InterruptedException e) {
System.out.println(this + "interrupted");
}
System.out.println(this + "terminating");
}

@Override
public String toString() {
return "TellerManager ";
}
}

class BankTellerSimulation {
static final int MAX_LINE_SIZE = 50;
static final int ADJUSTMENT_PERIOD = 100;

public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
// If line is too long, customers will leave:
CustomerLine customers = new CustomerLine(MAX_LINE_SIZE);
exec.execute(new CustomerGenerator(customers));
// Manager will add and remove tellers as necessary:
exec.execute(new TellerManager(exec, customers, ADJUSTMENT_PERIOD));
if (args.length > 0)
// Optional argument
{
TimeUnit.SECONDS.sleep(new Integer(args[0]));
} else {
System.out.println("Press ‘Enter’ to quit");
System.in.read();
}
exec.shutdownNow();
}
}

性能调优

  • 性能:原子操作>lock>synchronized
  • 风险:原子操作>lock>synchronized
  • 可读性:synchronized>lock>原子操作

如何确定同步块包含的范围?

需要包含共享资源被修改的整个过程

容器的性能进化

  • java1:类似Vector和Hashtable之类的类,具有许多synchronized的方法,当他们用于非多线程的应用程序中,会导致不可接受的开销。
  • java2:新容器都是不同步的,而且Collections中提供了各种同步的内部容器类,这时候的问题是在多线程的情况下,锁实现的开销依然不小。
  • java5:添加的专门用于处理同步的免锁容器,采用了更加灵巧的技术实现。

免锁容器采用了什么技术?

只对修改操作加锁操作,为了防止读取操作读到不同的线程的中间状态值,修改是在容器数据结构的某个部分的一个单独的副本上执行的,这个副本在修改的过程中是不可视的,当修改完成的时候,一个原子性的操作将把新的数组换入。

为什么要采用这种处理方法?

  1. 解决了并发时的安全和性能问题,当修改操作较少的时候很明显。
  2. 解决了多个迭代器同时修改和遍历的问题

什么是多个迭代器同时修改和遍历的问题?

无论是传统线程安全的Vector,还是新容器Collection,又或者安全的新容器synchronizedCollection都没有解决 迭代器同时修改和遍历 的问题。
这个问题似乎是设计师在设计的时候 就不希望通过原来的容器类中的方法去修改容器的结构,而是通过iterator里面的方法操作。
在代码中的体现就是下面这一段:

1
2
3
4
final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}

modCount表示容器实际被修改(add,remove,set)的次数。
expectedModCount是在迭代器中记录的被修改的次数。
因此,通过容器类中的方法修改后会改变modCount的值,而不会影响expectedModCount的值。
最终导致异常的产生。

如何防止ConcurrentModificationException异常的发生?

单线程下使用iterator里面的方法操作,remove里有以下的一段代码:

1
expectedModCount = modCount;

多线程下使用iterator也无效,为什么?

  1. expectedModCount是成员变量,线程独享的。
  2. 多线程下非安全的容器会有额外的问题,如下:
    1
    2
    if (i >= elementData.length)
    throw new ConcurrentModificationExcept();

多线程下如何解决?

  1. 迭代器加锁进行同步,使得两个迭代器的遍历不会同时发生。
  2. 回到最初的介绍——使用免锁容器:CopyOnWriteArrayList,CopyOnWriteArraySet,synchronizedHashMap ,ConcurrentHashMap。这就理解了为什么免锁容器要采用这样的技术了。

注意了,问题依然是存在的!我们继续分析:

  1. 使用迭代器:要注意System.Out.println(list)这个语句也会隐含的调用迭代器。
  2. 使用免锁容器:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    static final class COWIterator<E> implements ListIterator<E> {
    /** Snapshot of the array */
    private final Object[] snapshot;
    /** Index of element to be returned by subsequent call to next. */
    private int cursor;

    private COWIterator(Object[] elements, int initialCursor) {
    cursor = initialCursor;
    snapshot = elements;
    }

    public boolean hasNext() ;
    public E next() ;
    public int nextIndex();
    public int previousIndex();
    }

上面是免锁后期的迭代器类,可以看到使用的是snapshot快照,并且没有加锁的操作,因此当真正的elements改变的使用,迭代的依然是数组的快照。

乐观锁

乐观锁并不是加锁。而是使用原子操作compareAndSet(oldValue,newValue)实现的一个很有效的一个技巧。
思想:在使用最初获取数据和最后获取的数据进行对比,一致则操作成功,不一致则操作失败。

ReadWriteLock

1
2
Lock rLock = new ReadWriteLock(ture).readLock();
Lock wLock = new ReadWriteLock(ture).writeLock();

可以同时获取多个读锁,但是写锁只有一个,一旦有线程获取到写锁,所有的读锁都会被挂起,直到写锁被释放。
性能上是不可确认的。

此章节只是冰山一角,更多的并发编程参考书籍:《JAVA Concurrency in Practice》《Concurrent Programming in Java》

本文标题:21 并发

文章作者:Sun

发布时间:2018年12月12日 - 15:12

最后更新:2018年12月12日 - 16:12

原始链接:https://sunyi720.github.io/2018/12/12/THING IN JAVA/21 并发/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。