《Java并发编程的艺术》读书笔记(终章)

笔记内容包含第九章~第十章

线程池

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。
在开发过程中,合理地使用线程池能够带来3个好处:

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

但是,要做到合理利用线程池,必须对其实现原理了如指掌。

线程池的实现原理

当向线程池提交一个任务之后,线程池是如何处理这个任务的呢? 来看一下Jdk8中线程池的主要处理流程,处理流程图如下图所示:

图源:java8线程池

  1. 如果运行的线程小于corePoolSize,则尝试以给定的命令(Runnable command)作为其第一个任务启动一个新线程。对addWorker方法的调用以原子地检查runStateworkerCount,从而通过返回false,可以防止在不应该添加线程时发生错误警报。
  2. 如果任务可以成功排队,那么我们仍然需要双重检查(double-check)是否应该添加一个线程(因为自上次检查后现有的线程已经死亡)或者自从进入此方法后池关闭了。所以我们重新检查状态,如果线程池状态停止并且任务从队列移除的话,拒绝该任务,如果没有,则启动新的线程。
  3. 如果我们不能排队任务,那么我们尝试添加一个新线程。 如果失败,我们知道线程池已关闭或饱和,因此拒绝该任务。

Note: 由于书上这部分的介绍对Jdk8来说已过时,上面笔者截取自Jdk8的ThreadPoolExecutorexecute方法的注释。

线程池的使用

Java线程池的创建

1
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue RejectedExecutionHandler handler)

参数解释:

  • corePoolSize: 线程池维护线程的最少线程数,也是核心线程数,包括空闲线程
  • maximumPoolSize: 线程池维护线程的最大线程数。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。
  • keepAliveTime: 线程池维护线程所允许的空闲时间
  • unit: 程池维护线程所允许的空闲时间的单位
  • workQueue: 线程池所使用的缓冲队列/阻塞队列
  • handler: 线程池对拒绝任务的处理策略

当一个任务通过execute(Runnable)方法欲添加到线程池时:

  1. 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
  2. 如果此时线程池中的数量等于corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
  3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
  4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程 maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
  5. 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

可以选择以下几个阻塞队列:

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
  • PriorityBlockingQueue:一个具有优先级的无界阻塞队列。

向线程池提交任务

可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。

submit()方法用于提交需要返回值的任务。线程池会返回一个Future类型的对象,通过这个Future对象可以判断任务是否执行成功,并且可以通过Future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

关闭线程池

可以通过调用线程池的shutdownshutdownNow方法来关闭线程池。
它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务
可能永远无法终止。

区别:

shutdownNow方法首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown方法只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

合理的配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
  • 任务的优先级:高、中和低。
  • 任务的执行时间:长、中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。

  • CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。
  • 由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。
  • 混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。

可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。

建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。

作者举的例子:

有一次,我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样问题时也会影响到其他任务。

执行器

Executor框架包含的主要的类与接口如下图所示:

下面是这些类和接口的简介。

  • Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
  • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
  • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
  • Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
  • Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。

ThreadPoolExecutor

通常使用工厂类Executors来创建。Executors可以创建3种类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool.

1)FixedThreadPool,下面是Executors提供的,创建使用固定线程数的FixedThreadPool的API。

1
2
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。

下面是newFixedThreadPool方法源码:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

  • 它是一种固定大小的线程池;
  • corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
  • keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;
  • 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;
  • 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
  • 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。

2) SingleThreadExecutor,下面是Executors提供的,创建使用单个线程的SingleThreadExecutor的API。

1
2
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。

下面是newSingleThreadExecutor方法源码:

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
  • 它只会创建一条工作线程处理任务;
  • 其它作用与FixedThreadPool相同;

3)CachedThreadPool,下面是Executors提供的,创建一个会根据需要创建新线程的CachedThreadPool的API。

1
2
public static ExecutorService newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。

下面是newCachedThreadPool方法源码:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • 它是一个可以无限扩大的线程池;
  • 它比较适合处理执行时间比较小的任务;
  • corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
  • keepAliveTime为60s,意味着线程空闲时间超过60s就会被杀死;
  • 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

在步骤2)中新创建的线程将任务执行完后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1)),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。

总结:

FixedThreadPool和SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列。CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

ScheduledThreadPoolExecutor

通常使用工厂类Executors来创建。Executors可以创建2种类型的ScheduledThreadPoolExecutor,如下:

1)ScheduledThreadPoolExecutor: 包含若干个线程的ScheduledThreadPoolExecutor。

下面是工厂类Executors提供的,创建固定个数线程的ScheduledThreadPoolExecutor的API。

1
2
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。

2)SingleThreadScheduledExecutor: 只包含一个线程的ScheduledThreadPoolExecutor。

下面是Executors提供的,创建单个线程的SingleThreadScheduledExecutor的API。

1
2
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。

ScheduledThreadPoolExecutor的执行主要分为两大部分。

  1. 当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayedWorkQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask对象。
  2. 线程池中的线程从DelayedWorkQueue中获取ScheduledFutureTask,然后执行任务。

scheduleAtFixedRate方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

在ScheduledThreadPoolExecutor类中的ScheduledFutureTask待调度任务对象主要包含3个成员变量:

1
2
3
private final long sequenceNumber;  // 表示这个任务被添加到ScheduledThreadPoolExecutor中的序号。
private long time; // 表示这个任务将要被执行的具体时间,单位:纳秒。
private final long period; // 表示任务执行的间隔周期。

DelayedWorkQueue是一个在ScheduledThreadPoolExecutor类中实现BlockingQueue接口的无界阻塞队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中没有什么意义(设置maximumPoolSize的大小没有什么效果)。

Note: 由于书中介绍的DelayQueue无界阻塞队列在Jdk8中不再是ScheduledThreadPoolExecutor的默认实现,所以这里说明改为DelayedWorkQueue,如果有误欢迎指出。

Future

当我们把Runnable接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor或ScheduledThreadPoolExecutor时,ThreadPoolExecutor或ScheduledThreadPoolExecutor会向我们返回一个实现Future接口的对象。下面是对应的API:

1
2
3
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);

Note: 由于书上这部分的介绍对Jdk8来说已过时,以下是笔者的总结:

FutureTask可以处于下面几种状态:

1
2
3
4
5
6
7
private static final int NEW          = 0;  // 任务初始化,未运行
private static final int COMPLETING = 1; // 任务正在运行
private static final int NORMAL = 2; // 任务已运行完成
private static final int EXCEPTIONAL = 3; // 任务运行异常
private static final int CANCELLED = 4; // 任务被取消
private static final int INTERRUPTING = 5; // 任务正在被中断
private static final int INTERRUPTED = 6; // 任务已中断

可能的状态转换:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

FutureTask状态转换图

图源:FutureTask解析

Runnable接口和Callable接口

Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。
它们之间的区别是Runnable不会返回结果,而Callable可以返回结果。

除了可以自己创建实现Callable接口的对象外,还可以使用工厂类Executors来把一个Runnable包装成一个Callable。

下面是Executors提供的,把一个Runnable包装成一个Callable的API:

1
2
public static Callable<Object> callable(Runnable task)
public static <T> Callable<T> callable(Runnable task, T result)

以上,如有问题欢迎提出!

如果对您有所帮助,欢迎投食!
0%