为什么要使用线程池
- 线程池提供了一种限制和管理资源
- 好处:
- 降低资源消耗:线程的创建和销毁需要消耗资源
- 提高响应速度:当任务达到的时候,任务可以不用等待线程创建就能立即执行
- 提高线程的可管理性:线程是稀缺资源,不可以无限创建,需要统一分配和调度、监控
如何创建线程池
使用 Executors
// 允许创建线程的数量为 Integer.MAX_VALUE,可能会导致创建大量线程,从而导致 OOM
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
底层调用 ThreadPoolExecutor
1 |
|
参数
- corePoolSize:核心线程最大数量,通俗点来讲就是,线程池中常驻线程的最大数量
- maxinumPoolSize:线程池中运行最大线程数(包括核心线程和非核心线程)
- keepAliveTime:线程池中空闲线程(仅适用于非核心线程)所能存活的最长时间。
- unit:keepAliveTime 参数的时间单位。
- workQueue:当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被放在队列中。
- threadFactory:executor 创建新线程的时候会用到
- handler :线程池的饱和策略,当前同时运行的线程数量达到最大线程数量并且队列也已经被放满时, ThreadPoolTaskExecutor 定义一些策略
- ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException 来拒绝新任务的处理。默认
- ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用 execute 方法的线程中运行 (run) 被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务的提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
- ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
- ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。
队列执行流程
- 当提交一个新的任务,线程池的处理流程如下
- 判断线程池中核心线程是否已达到阈值;若否:则创建一个核心线程
- 若核心线程已达到阈值,判断阻塞队列 是否已满;若未满:则创建新的任务并添加到阻塞队列
- 若满,在判断线程池中线程数是否达到阈值;若否:则创建一个非核心线程执行任务,
- 若达到阈值,则执行 线程池饱和策略。
从流程的角度分析如下图:
从结构的角度分析如下图:
几种典型的工作队列
- ArrayBlockingQueue:使用数组实现的有界阻塞队列,特性先进先出
- LinkedBlockingQueue:使用链表实现的阻塞队列,特性先进先出,可以设置其容量,默认为
Interger.MAX_VALUE
,特性先进先出 - PriorityBlockingQueue:使用平衡二叉树堆,实现的具有优先级的无界阻塞队列
- DelayQueue:无界阻塞延迟队列,队列中每个元素均有过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最块要过期的元素。
- SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作,必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
SingleThreadExecutor
1 |
|
特点
- SingleThreadExecutor:方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会保存在一个队列中,等待线程空闲,按先入先出的顺序执行队列中的任务。
- SingleThreadExecutor :使用无界队列 LinkedBlockQueue (队列的容量 Integer.MAX_VALUE)作为线程池的队列
执行流程
- 当线程池中没有线程时,会创建一个新线程来执行任务。
- 当前线程池中有一个线程后,将新任务加入
LinkedBlockingQueue
- 线程执行完第一个任务后,会在一个无限循环中反复从
LinkedBlockingQueue
获取任务来执行。
使用场景
适用于串行执行任务场景
FixedThreadPool
1 |
|
特点
- FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的数量始终不变。当有一个新的任务提交时,线程池中若有空闲的线程,则立即执行。若没有,则新的任务会暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
corePoolSize
等于maximumPoolSize
,所以线程池中只有核心线程- FixedThreadPool :使用无界队列 LinkedBlockQueue (队列的容量 Integer.MAX_VALUE)作为线程池的队列
- 因为最大线程数固定,不会拒绝任务,在任务比较多的时候会导致 OOM
执行流程
- 如果当前运行的线程数少于
corePoolSize
,则创建新线程来执行任务。 - 在线程数目达到
corePoolSize
后,将新任务放到LinkedBlockingQueue
阻塞队列中。 - 线程执行完(1)中任务后,会在循环中反复从
LinkedBlockingQueue
获取任务来执行。
使用场景
适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
CachedThreadPool
1 |
|
特点
核心线程数为0,总线程数量阈值为
Integer.MAX_VALUE
,即可以创建无限的非核心线程CacheThreadPool 允许创建的线程数量为 Integer.MAX_VALUE ,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maxinunPool 中线程处理任务的速度时, CacheThreadPool 会不断创建新的线程,极端情况下,这样会导致耗尽 CPU 和 内存资源。
若等待超过60s,空闲线程就会终止。
执行流程
- 先执行
SynchronousQueue
的offer
方法提交任务,并查询线程池中是否有空闲线程来执行SynchronousQueue
的poll
方法来移除任务。如果有,则配对成功,将任务交给这个空闲线程 - 否则,配对失败,创建新的线程去处理任务
- 当线程池中的线程空闲时,会执行
SynchronousQueue
的poll
方法等待执行SynchronousQueue
中新提交的任务。
使用场景
执行大量短生命周期任务。因为maximumPoolSize
是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务,都会立即有线程去处理,因此CachedThreadPool
适用于处理大量、耗时少的任务。
ScheduledThreadPool
特点
- ScheduledThreadPool:创建一个定长线程池,支持 定时及周期性任务执行
- 非核心线程存活时间为0,所以线程池仅仅包含固定数目的核心线程。
- ScheduledThreadPoolExecutor :使用的任务队列 DelayQueue 封装了一个 PriortyQueue 会对队列中的任务进行排序,执行所需时间段的放在前面先被执行 (ScheduleFuture 的 time 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行 (ScheduleFuture 的 squenceNumber 变量小的先执行)。
执行流程
- 两种方式提交任务:
- scheduleAtFixedRate: 按照固定速率周期执行
- scheduleWithFixedDelay:上个任务延迟固定时间后执行
使用场景
周期性执行任务,并且需要限制线程数量的场景
线程池的状态
线程池有这几个状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
1 | //线程池状态 |
RUNNING
- 该状态的线程池会接收新任务,并处理阻塞队列中的任务;
- 调用线程池的shutdown()方法,可以切换到SHUTDOWN状态;
- 调用线程池的shutdownNow()方法,可以切换到STOP状态;
SHUTDOWN
- 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
- 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态;
STOP
- 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
- 线程池中执行的任务为空,进入TIDYING状态;
TIDYING
- 该状态表明所有的任务已经运行终止,记录的任务数量为0。
- terminated()执行完毕,进入TERMINATED状态
TERMINATED
- 该状态表示线程池彻底终止
线程池异常的处理方法
- 我们可以直接try…catch捕获。
- 通过Future对象的get方法接收抛出的异常,再处理
- 重写ThreadPoolExecutor的afterExecute方法,处理传递的异常引用
- 为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常
ThreadPoolExecutor中的execute方法和submit方法区别
- execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否
- submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get() 方法来获取返回值, get() 方法会阻塞当前线程直到任务完成,而使用 get(long timeout, TimeUnit unit) 方法会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
shutdown()和shutDownNow()
- shutdown():关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
- shutdownNow():关闭线程池,线程的状态变为 STOP。线程池会终止正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。
isTerminated() 和 isShutdown()
- isShutDown 当调用 shutdown() 方法后返回为 true。
- isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true
1 | private static final int CORE_POOL_SIZE = 5; |
1 | public static void main(String[] args) { |
线程池的线程数量
需要根据实际业务场景来划分
- CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
Executor 框架结构
- 任务(Runnable /Callable)
- 执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。
- 任务的执行(Executor)
- 包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。
- 异步计算的结果(Future)
- Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)
Executor 的使用过程
- 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
- 把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable
task))。 - 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。
- 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
参考资料: