0%

java多线程--线程池

为什么要使用线程池

  • 线程池提供了一种限制和管理资源
  • 好处:
    • 降低资源消耗:线程的创建和销毁需要消耗资源
    • 提高响应速度:当任务达到的时候,任务可以不用等待线程创建就能立即执行
    • 提高线程的可管理性:线程是稀缺资源,不可以无限创建,需要统一分配和调度、监控

如何创建线程池

使用 Executors

// 允许创建线程的数量为 Integer.MAX_VALUE,可能会导致创建大量线程,从而导致 OOM

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

// 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);

底层调用 ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

参数

  • corePoolSize:核心线程最大数量,通俗点来讲就是,线程池中常驻线程的最大数量
  • maxinumPoolSize:线程池中运行最大线程数(包括核心线程和非核心线程)
  • keepAliveTime:线程池中空闲线程(仅适用于非核心线程)所能存活的最长时间。
  • unit:keepAliveTime 参数的时间单位。
  • workQueue:当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被放在队列中
  • threadFactory:executor 创建新线程的时候会用到
  • handler :线程池的饱和策略,当前同时运行的线程数量达到最大线程数量并且队列也已经被放满时, ThreadPoolTaskExecutor 定义一些策略
    • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException 来拒绝新任务的处理。默认
    • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用 execute 方法的线程中运行 (run) 被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务的提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
    • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
    • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。

队列执行流程

  • 当提交一个新的任务,线程池的处理流程如下
    • 判断线程池中核心线程是否已达到阈值;若否:则创建一个核心线程
    • 若核心线程已达到阈值,判断阻塞队列 是否已满;若未满:则创建新的任务并添加到阻塞队列
    • 若满,在判断线程池中线程数是否达到阈值;若否:则创建一个非核心线程执行任务
    • 若达到阈值,则执行 线程池饱和策略

从流程的角度分析如下图:

1613900704650

从结构的角度分析如下图:

1613901625686

几种典型的工作队列

  • ArrayBlockingQueue:使用数组实现的有界阻塞队列,特性先进先出
  • LinkedBlockingQueue:使用链表实现的阻塞队列,特性先进先出,可以设置其容量,默认为 Interger.MAX_VALUE ,特性先进先出
  • PriorityBlockingQueue:使用平衡二叉树,实现的具有优先级的无界阻塞队列
  • DelayQueue:无界阻塞延迟队列,队列中每个元素均有过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最块要过期的元素。
  • SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作,必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态

SingleThreadExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

特点

  • SingleThreadExecutor:方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会保存在一个队列中,等待线程空闲,按先入先出的顺序执行队列中的任务。
  • SingleThreadExecutor :使用无界队列 LinkedBlockQueue (队列的容量 Integer.MAX_VALUE)作为线程池的队列

执行流程

  • 当线程池中没有线程时,会创建一个新线程来执行任务。
  • 当前线程池中有一个线程后,将新任务加入LinkedBlockingQueue
  • 线程执行完第一个任务后,会在一个无限循环中反复从LinkedBlockingQueue 获取任务来执行。

1610065305945

使用场景

​ 适用于串行执行任务场景

FixedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

特点

  • FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的数量始终不变。当有一个新的任务提交时,线程池中若有空闲的线程,则立即执行。若没有,则新的任务会暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • corePoolSize等于maximumPoolSize,所以线程池中只有核心线程
  • FixedThreadPool :使用无界队列 LinkedBlockQueue (队列的容量 Integer.MAX_VALUE)作为线程池的队列
  • 因为最大线程数固定,不会拒绝任务,在任务比较多的时候会导致 OOM

执行流程

  • 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
  • 在线程数目达到corePoolSize后,将新任务放到LinkedBlockingQueue阻塞队列中。
  • 线程执行完(1)中任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

1610064973712

使用场景

适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。

CachedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

特点

  • 核心线程数为0,总线程数量阈值为Integer.MAX_VALUE,即可以创建无限的非核心线程

  • CacheThreadPool 允许创建的线程数量为 Integer.MAX_VALUE ,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maxinunPool 中线程处理任务的速度时, CacheThreadPool 会不断创建新的线程,极端情况下,这样会导致耗尽 CPU 和 内存资源。

  • 若等待超过60s,空闲线程就会终止。

执行流程

  • 先执行SynchronousQueueoffer方法提交任务,并查询线程池中是否有空闲线程来执行SynchronousQueuepoll方法来移除任务。如果有,则配对成功,将任务交给这个空闲线程
  • 否则,配对失败,创建新的线程去处理任务
  • 当线程池中的线程空闲时,会执行SynchronousQueuepoll方法等待执行SynchronousQueue中新提交的任务。

1613903401127

使用场景

执行大量短生命周期任务。因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务,都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务。

ScheduledThreadPool

特点

  • ScheduledThreadPool:创建一个定长线程池,支持 定时及周期性任务执行
  • 非核心线程存活时间为0,所以线程池仅仅包含固定数目的核心线程。
  • ScheduledThreadPoolExecutor :使用的任务队列 DelayQueue 封装了一个 PriortyQueue 会对队列中的任务进行排序,执行所需时间段的放在前面先被执行 (ScheduleFuture 的 time 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行 (ScheduleFuture 的 squenceNumber 变量小的先执行)。

1610065993004

执行流程

  • 两种方式提交任务:
    • scheduleAtFixedRate: 按照固定速率周期执行
    • scheduleWithFixedDelay:上个任务延迟固定时间后执行

使用场景

周期性执行任务,并且需要限制线程数量的场景

线程池的状态

线程池有这几个状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。

1
2
3
4
5
6
//线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

1613904180448

RUNNING

  • 该状态的线程池会接收新任务,并处理阻塞队列中的任务;
  • 调用线程池的shutdown()方法,可以切换到SHUTDOWN状态;
  • 调用线程池的shutdownNow()方法,可以切换到STOP状态;

SHUTDOWN

  • 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
  • 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态;

STOP

  • 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
  • 线程池中执行的任务为空,进入TIDYING状态;

TIDYING

  • 该状态表明所有的任务已经运行终止,记录的任务数量为0。
  • terminated()执行完毕,进入TERMINATED状态

TERMINATED

  • 该状态表示线程池彻底终止

线程池异常的处理方法

  1. 我们可以直接try…catch捕获。
  2. 通过Future对象的get方法接收抛出的异常,再处理
  3. 重写ThreadPoolExecutor的afterExecute方法,处理传递的异常引用
  4. 为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常

1613904400368

ThreadPoolExecutor中的execute方法和submit方法区别

  • execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否
  • submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get() 方法来获取返回值, get() 方法会阻塞当前线程直到任务完成,而使用 get(long timeout, TimeUnit unit) 方法会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

shutdown()和shutDownNow()

  • shutdown():关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
  • shutdownNow():关闭线程池,线程的状态变为 STOP。线程池会终止正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。

isTerminated() 和 isShutdown()

  • isShutDown 当调用 shutdown() 方法后返回为 true。
  • isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 2;
private static final Long KEEP_ALIVE_TIME = 1L;

public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,//
MAX_POOL_SIZE,//
KEEP_ALIVE_TIME,//
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy()
);

for (int i = 0; i < 10; i++) {
Runnable worker = new MyRunnable("" + i);
// executor.execute(worker);
Future<?> result = executor.submit(worker);
}
//终止线程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());

List<Future<String>> futureList = new ArrayList<>();
Callable<String> callable = new MyCallable();
for (int i = 0; i < 10; i++) {
//提交任务到线程池
Future<String> future = executor.submit(callable);
//将返回值 future 添加到 list,我们可以通过 future 获得 执行 Callable 得到的返回值
futureList.add(future);
}
for (Future<String> fut : futureList) {
try {
System.out.println(new Date() + "::" + fut.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
//关闭线程池
executor.shutdown();
}

线程池的线程数量

需要根据实际业务场景来划分

  • CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
  • I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

Executor 框架结构

  • 任务(Runnable /Callable)
    • 执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。
  • 任务的执行(Executor)
    • 包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。
  • 异步计算的结果(Future)
    • Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)

Executor 的使用过程

  • 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
  • 把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable task))。
  • 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。
  • 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

参考资料:

https://zhuanlan.zhihu.com/p/73990200


----------- 本文结束啦感谢您阅读 -----------