28Java多线程之线程池

推荐:

视频教程:https://www.bilibili.com/video/BV1wh411e7nd

对应代码:https://github.com/hellozhaolq/2021-Java-ThreadPool-Tutorial

对于数据库连接,我们经常听到数据库连接池这个概念。因为建立数据库连接是非常耗时的一个操作,其中涉及到网络IO的一些操作。因此就想出把连接通过一个连接池来管理。需要连接的话,就从连接池里取一个。当使用完了,就“关闭”连接,这不是真正意义上的关闭,只是把连接放回池里,供其他人在使用。所以对于线程,也有了线程池这个概念,其中的原理和数据库连接池差不多。

程序启动一个新线程成本是比较高的,因为它涉及到要与操作系统进行交互。而使用线程池可以很好的提高性能,尤其是当程序中要创建大量生存期很短的线程时,更应该考虑使用线程池。线程池里的每一个线程代码结束后,并不会死亡,而是再次回到线程池中成为空闲状态,等待下一个对象来使用。

在 JDK5 之前,我们必须手动实现自己的线程池,从JDK5开始,Java内置支持线程池。

作用

线程池就是限制系统中使用线程的数量以及更好的使用线程

根据系统的运行情况,可以自动或手动设置线程数量,达到运行的最佳效果:配置少了,将影响系统的执行效率,配置多了,又会浪费系统的资源。用线程池配置数量,其他线程排队等候。当一个任务执行完毕后,就从队列中取一个新任务运行,如果没有新任务,那么这个线程将等待。如果来了一个新任务,但是没有空闲线程的话,那么把任务加入到等待队列中。

为什么要用线程池?

  • 减少线程创建和销毁的次数,使线程可以多次复用
  • 可以根据系统情况,调整线程的数量。防止创建过多的线程,消耗过多的内存(每个线程1M左右)

线程池 UML 类图

image-20241129103158944

Executor

Java 里面线程池的顶级接口是 java.util.concurrent.Executor,但是严格意义上讲 Executor 并不是一个线程池,它只是一个执行线程的工具。

真正的线程池接口是 java.util.concurrent.ExecutorService

1
2
3
4
5
6
7
8
9
10
public interface Executor {
/**
* 在将来的某个时间执行给定的命令。该命令可以在新线程、池线程或调用线程中执行,由Executor实现决定。
*
* 参数:command – 可运行的任务
* 抛出:RejectedExecutionException – 如果此任务不能被接受执行
* NullPointerException – 如果命令为空
*/
void execute(Runnable command);
}

ExecutorService

ExecutorService 对象表示一个线程池,可以执行 Runnable 对象或者 Callable 对象代表的线程。

全部接口方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
public interface ExecutorService extends Executor {
/**
* 启动有序关闭,继续执行先前已提交的任务,但不会接受新任务。如果已经关闭,调用没有额外的效果。
*
* 该方法不等待之前提交的任务完成执行,使用awaitTermination来做到这一点。(人话:该方法无返回值,无法判断池中任务是否完成执行,使用awaitTermination方法判断)
*
* 抛出:SecurityException – 如果存在安全管理器并且关闭此 ExecutorService 可能会操纵调用者不允许修改的线程,
* 因为它不持有RuntimePermission ("modifyThread") ,或者安全管理器的checkAccess方法拒绝访问。
*/
void shutdown();

/**
* 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。
* 除了尽最大努力停止处理正在执行的任务之外,没有任何保证。例如,典型的实现将通过Thread.interrupt取消,
* 因此任何无法响应interrupt的任务可能永远不会终止。
*
* 此方法不等待正在执行的任务终止,使用awaitTermination来做到这一点。
*
* 返回:从未开始执行的任务列表
* 抛出:SecurityException – 如果存在安全管理器并且关闭此 ExecutorService 可能会操纵调用者不允许修改的线程,
* 因为它不持有RuntimePermission ("modifyThread") ,或者安全管理器的checkAccess方法拒绝访问。
*/
List<Runnable> shutdownNow();

/**
* 线程池状态是否 >= SHUTDOWN,ThreadPoolExecutor的实现 runStateAtLeast(ctl.get(), SHUTDOWN)
*
* 返回:如果此executor已关闭,则为true。
*/
boolean isShutdown();

/**
* 线程池状态是否 >= TERMINATED,ThreadPoolExecutor的实现 runStateAtLeast(ctl.get(), TERMINATED)
*
* 如果关闭后所有任务均已完成,则返回true。请注意,除非首先调用shutdown或shutdownNow,否则isTerminated永远不会为true。
*
* 返回:如果关闭后所有任务均已完成,则为true
*/
boolean isTerminated();

/**
* 监测ExecutorService是否在等待时间内关闭
*
* 参数:timeout – 等待的最长时间
* unit – 超时参数的时间单位
* 返回:如果executor终止,则为true;如果超时未终止,则为false
* 抛出:InterruptedException – 如果在等待时被中断
*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;


/********************************************************************************/


/**
* 提交Runnable任务以执行并返回代表该任务的Future。Future的get方法将在成功完成后 返回null,返回null,返回null。
*
* 参数:task – 要提交的任务
* 返回:代表待完成任务的 Future
* 抛出:RejectedExecutionException – 如果无法安排任务执行
* NullPointerException – 如果任务为空
*/
Future<?> submit(Runnable task);

/**
* 提 Runnable任务以执行并返回代表该任务的Future。Future的get方法将在成功完成后返回给定的结果。
*
* 参数:task – 要提交的任务
* result - 要返回的结果
* 类型参数:<T> – 结果的类型
* 返回:代表待完成任务的 Future
* 抛出:RejectedExecutionException – 如果无法安排任务执行
* NullPointerException – 如果任务为空
*/
<T> Future<T> submit(Runnable task, T result);

/**
* 提交一个返回值的任务以供执行,并返回一个表示该任务待处理结果的 Future。 Future 的get方法将在成功完成后返回任务的结果。
* 如果您想立即阻止等待任务,可以使用result = exec.submit(aCallable).get();形式的结构。
*
* 注意:Executors类包含一组方法,可以将一些其他常见的类似闭包的对象(例如java.security.PrivilegedAction )转换为Callable形式,
* 以便可以提交它们。
*
* 参数:task – 要提交的任务
* 类型参数:<T> – 任务结果的类型
* 返回:代表待完成任务的 Future
* 抛出:RejectedExecutionException – 如果无法安排任务执行
* NullPointerException – 如果任务为空
*/
<T> Future<T> submit(Callable<T> task);

/**
* 执行给定的任务,当全部完成时,返回保存其状态和结果的Futures列表。
* 对于返回列表的每个元素调用Future.isDone都返回true。请注意,已完成的任务可能已经正常终止,也可能通过引发异常终止。
* 如果在此操作正在进行时修改了给定的集合,则此方法的结果是不确定的。
*
* 参数:tasks — 任务的集合
* 类型参数:<T> – 从任务返回的值的类型
* 返回:表示任务的Futures列表,与给定任务列表的迭代器生成的顺序相同,每个任务都已完成
* 抛出:InterruptedException – 如果在等待时被中断,在这种情况下,未完成的任务将被取消
* NullPointerException – 如果任务或其任何元素为null
* RejectedExecutionException – 如果任何任务无法安排执行
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

/**
* 执行给定的任务,当全部完成或超时到期(以先发生者为准)时,返回保存其状态和结果的Futures列表。
* 对于返回列表的每个元素调用Future.isDone都返回true。返回时,取消未完成的任务。请注意,已完成的任务可能已经正常终止,也可能通过引发异常终止。
* 如果在此操作正在进行时修改了给定的集合,则此方法的结果是不确定的。
*
* 参数:tasks — 任务的集合
* timeout – 等待的最长时间
* unit – 超时参数的时间单位
* 类型参数:<T> – 从任务返回的值的类型
* 返回:表示任务的Futures列表,其顺序与给定任务列表的迭代器生成的顺序相同。如果操作没有超时,则每个任务都将完成。如果确实超时,其中一些任务将无法完成。
* 抛出:InterruptedException – 如果在等待时被中断,在这种情况下,未完成的任务将被取消
* NullPointerException – 如果任务、其任何元素或单元为null
* RejectedExecutionException – 如果任何任务无法安排执行
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException;

/**
* 执行给定任务,返回已成功完成的任务的结果(即不抛出异常),如果有的话。
* 正常或异常返回时,取消未完成的任务。
* 如果在此操作正在进行时修改了给定的集合,则此方法的结果是不确定的。
*
* 参数:tasks — 任务的集合
* 类型参数:<T> – 从任务返回的值的类型
* 返回:其中一项任务返回的结果(最先完成的执行结果)
* 抛出:InterruptedException – 如果在等待时被中断
* NullPointerException – 如果任务或任何要执行的元素任务为null
* IllegalArgumentException – 如果任务为空
* ExecutionException – 如果没有任务成功完成
* RejectedExecutionException – 如果无法安排任务执行
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

/**
* 执行给定任务,返回已成功完成的任务的结果(即不抛出异常),如果未超时。
* 正常或异常返回时,取消未完成的任务。
* 如果在此操作正在进行时修改了给定的集合,则此方法的结果是不确定的。
*
* 参数:tasks — 任务的集合
* timeout – 等待的最长时间
* unit – 超时参数的时间单位
* 类型参数:<T> – 从任务返回的值的类型
* 返回:其中一项任务返回的结果(最先完成的执行结果)
* 抛出:InterruptedException – 如果在等待时被中断
* NullPointerException – 如果任务、单元或任何要执行的元素任务为null
* TimeoutException – 如果超过指定时间且没有任何任务成功完成
* ExecutionException – 如果没有任务成功完成
* RejectedExecutionException – 如果无法安排任务执行
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService

提供 ExecutorService 执行方法的默认实现。该类使用 newTaskFor(返回 RunnableFuture)实现了 submit 、 invokeAny 和 invokeAll 方法,默认为 java.util.concurrent 包中提供的 FutureTask 类。例如, submit(Runnable) 的实现创建一个执行并返回的关联 RunnableFuture 。子类可以重写 newTaskFor方法以返回除 FutureTask 之外的 RunnableFuture 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
新增的两个newTaskFor方法:
1、供自身实现的方法调用;
2、供子类重写(例如 ForkJoinPool 类)
*/
public abstract class AbstractExecutorService implements ExecutorService {
/**
* 返回给定Runnable和默认值的 RunnableFuture 。
*
* 参数:runnable – 被包装的Runnable
* value – 返回的 future 的默认值
* 类型参数:<T> – 给定值的类型
* 返回:RunnableFuture,当运行时,将运行底层Runnable,并且作为Future,将产生给定值作为其结果,并提供底层任务的取消
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

/**
* 返回给定Callable的 RunnableFuture 。
*
* 参数:callable – 被包装的Callable
* 类型参数:<T> – Callable结果的类型
* 返回:RunnableFuture,当运行时,将运行底层Callable,并且作为Future ,将产生给定值作为其结果,并提供底层任务的取消
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
}

ScheduledExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public interface ScheduledExecutorService extends ExecutorService {
/**
* 创建并执行在给定延迟后启用的一次性操作。
*
* 参数:command - 要执行的任务
* delay – 从现在开始延迟执行的时间
* unit – 延迟参数的时间单位
* 返回:一个 ScheduledFuture 表示任务的挂起完成,其get()方法将在完成时返回null(因为是Runnable任务)。
* 抛出:RejectedExecutionException – 如果无法安排任务执行
* NullPointerException – 如果命令为空
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

/**
* 创建并执行在给定延迟后启用的 ScheduledFuture,一次性操作。
*
* 参数:callable – 要执行的函数
* delay – 从现在开始延迟执行的时间
* unit – 延迟参数的时间单位
* 类型参数:<V> – 可调用结果的类型
* 返回:可用于提取结果或取消的 ScheduledFuture
* 抛出:RejectedExecutionException – 如果无法安排任务执行
* NullPointerException – 如果 callable 为 null
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

/**
* 创建并执行一个周期性动作,该动作在给定的初始延迟后首先启用,随后在给定的周期内启用;也就是说,执行将在initialDelay之后开始,
* 然后是 initialDelay+period,然后是 initialDelay + 2*period,依此类推。
* 如果此任务的任何执行遇到异常,则后续执行将被抑制。否则,任务只会通过取消或终止executor来终止。
* 如果此任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会同时执行。
*
* 参数:command - 要执行的任务
* initialDelay – 延迟首次执行的时间
* period周期 – 连续执行之间的时间段,可以理解为个执行的开始和下一个执行的开始之间的延迟
* unit – initialDelay 和 period 参数的时间单位
* 返回:表示待完成任务的 ScheduledFuture,其get()方法将在取消时抛出异常
* 抛出:RejectedExecutionException – 如果无法安排任务执行
* NullPointerException – 如果命令为空
* IllegalArgumentException – 如果 period 小于或等于零
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

/**
* 创建并执行一个周期性操作,该操作首先在给定的初始延迟后启用,随后在一个执行的终止和下一个执行的开始之间具有给定的延迟。
* 如果此任务的任何执行遇到异常,则后续执行将被抑制。否则,任务只会通过取消或终止executor来终止。
*
* 参数:command - 要执行的任务
* initialDelay – 延迟首次执行的时间
* delay延迟 - 一个执行的终止和下一个执行的开始之间的延迟
* unit – initialDelay 和 delay 参数的时间单位
* 返回:表示待完成任务的 ScheduledFuture,其get()方法将在取消时抛出异常
* 抛出:RejectedExecutionException – 如果无法安排任务执行
* NullPointerException – 如果命令为空
* IllegalArgumentException – 如果延迟小于或等于零
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

线程池的创建

https://www.cnblogs.com/pcheng/p/13540619.html

https://www.cnblogs.com/vhua/tag/%E5%B9%B6%E5%8F%91/

Java 中创建线程池主要有两种方法:

  • 通过 Executors 工厂类创建,该类提供了4种不同的线程池可供使用。
  • 通过 ThreadPoolExecutor 类进行自定义创建,推荐

Executors工厂类-不推荐

java.util.concurrent.Executors 类提供了 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。该类支持以下几种方法:

  • 创建并返回常用配置的 ExecutorService 的方法。
  • 创建并返回常用配置的 ScheduledExecutorService 的方法。
  • 创建并返回“包装的”ExecutorService 的方法,该方法通过使特定于实现的方法不可访问来禁用重新配置。
  • 创建并返回ThreadFactory的方法,该 ThreadFactory 将新创建的线程设置为已知状态。
  • 从其他类似闭包的形式创建并返回Callable方法,因此它们可以在需要Callable执行方法中使用。

创建线程工厂

Executors 提供两个线程工厂(ThreadFactory接口)的实现类及工厂方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

/**
* 返回用于创建新线程的默认线程工厂。该工厂在同一个ThreadGroup中创建Executor使用的所有新线程。
* 如果有一个SecurityManager,设置为System.getSecurityManager的组,否则设置为调用这个defaultThreadFactory方法的线程组。
* 每个新线程都作为非守护线程创建,优先级设置为Thread.NORM_PRIORITY和线程组中允许的最大优先级中的较小者。
* 新线程的名称可通过pool-N-thread-M的Thread.getName访问,其中N是该工厂的序列号,M是该工厂创建的线程的序列号。
*
* 返回:线程工厂
*/
public static ThreadFactory defaultThreadFactory()

/**
* 返回一个线程工厂,用于创建与当前线程具有相同权限的新线程。
* 该工厂创建与defaultThreadFactory具有相同设置的线程,另外将新线程的AccessControlContext和contextClassLoader设置为与调用
* 此privilegedThreadFactory方法的线程相同。可以在AccessController.doPrivileged操作中创建一个新的privilegedThreadFactory,
* 设置当前线程的访问控制上下文,以创建在该操作中保持所选权限设置的线程。
*
* 请注意,虽然在此类线程中运行的任务将具有与当前线程相同的访问控制和类加载器设置,但它们不需要具有相同的ThreadLocal或
* InheritableThreadLocal值。如有必要,可以使用ThreadPoolExecutor.beforeExecute(Thread, Runnable)在ThreadPoolExecutor子类
* 中运行任何任务之前设置或重置线程局部变量的特定值。此外,如果需要初始化工作线程以使其具有与其他指定线程相同的InheritableThreadLocal设置,
* 您可以创建一个自定义ThreadFactory,该线程在其中等待并服务请求以创建将继承其值的其他线程。
*
* 返回:线程工厂
* 抛出:AccessControlException – 如果当前访问控制上下文没有获取和设置上下文类加载器的权限
*/
public static ThreadFactory privilegedThreadFactory()

newFixedThreadPool-慎用

固定大小的线程池,允许的请求队列 LinkedBlockingQueue 容量为 Integer.MAX_VALUE( 2^31≈21亿5千万),可能会堆积大量的请求,从而导致OOM。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 创建一个线程池,该线程池重用在共享无界队列上运行的固定数量的线程。在任何时候,最多 nThreads 个线程将是活动的处理任务。
* 如果在所有线程都处于活动状态时提交了其他任务,它们将在无界队列中等待,直到有线程可用。
* 如果任何线程在关闭之前的执行过程中由于失败而终止,而后续还有任务需要执行,那么新的线程将取代它。池中的线程将一直存在,直到显式关闭。
*
* 参数:nThreads - 池中的线程数
* 返回:新创建的线程池
* 抛出:IllegalArgumentException – 如果 nThreads <= 0
*/
public static ExecutorService newFixedThreadPool(int nThreads)

/**
* 同上
* 在需要时使用提供的 ThreadFactory 创建新线程。
*
* 参数:nThreads - 池中的线程数
* threadFactory - 创建新线程时使用的工厂
* 返回:新创建的线程池
* 抛出: NullPointerException – 如果 threadFactory 为 null
* IllegalArgumentException – 如果 nThreads <= 0
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

newSingleThreadExecutor-慎用

单个线程的线程池,允许的请求队列 LinkedBlockingQueue 容量为 Integer.MAX_VALUE( 2^31≈21亿5千万),可能会堆积大量的请求,从而导致OOM。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 创建一个 Executor,它使用单个工作线程操作无界队列。
* 请注意,如果该单线程在关闭前的执行过程中因失败而终止,而后续还有任务需要执行,那么新的线程将取代它。
* 保证所有任务按顺序执行。
* 与其他等效的 newFixedThreadPool(1) 不同,返回的executor保证不可重新配置以使用额外的线程。
*
* 返回:新创建的单线程 Executor
*/
public static ExecutorService newSingleThreadExecutor()

/**
* 创建一个 Executor,它使用单个工作线程操作无界队列,并在需要时使用提供的 ThreadFactory 创建一个新线程。
* 与其他等效的 newFixedThreadPool(1, threadFactory) 不同,返回的executor保证不能重新配置以使用其他线程。
*
* 参数:threadFactory – 创建新线程时使用的工厂
* 返回:新创建的单线程 Executor
* 抛出:NullPointerException – 如果 threadFactory 为 null
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new AutoShutdownDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

newCachedThreadPool-慎用

可缓存的线程池,使用 SynchronousQueue,允许创建的线程数量为 Integer.MAX_VALUE( 2^31≈21亿5千万),可能会创建大量的线程,从而导致OOM。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 创建一个线程池,根据需要创建新线程,但在可用时将重用以前构造的线程。
* 这些池通常会提高执行许多短期异步任务的程序的性能。
* 如果可用,对执行的调用将重用以前构造的线程。如果没有可用的现有线程,将创建一个新线程并将其添加到池中。
* 六十秒内未使用的线程将被终止并从缓存中删除。
* 因此,保持空闲足够长时间的池不会消耗任何资源。
* 请注意,可以使用 ThreadPoolExecutor 构造函数创建具有相似属性但细节不同(例如超时参数)的池。
*
* 返回:新创建的线程池
*/
public static ExecutorService newCachedThreadPool()

/**
* 创建一个线程池,根据需要创建新线程,但会在可用时重用以前构造的线程,并在需要时使用提供的 ThreadFactory 创建新线程。
*
* 参数:threadFactory – 创建新线程时使用的工厂
* 返回:新创建的线程池
* 抛出:NullPointerException – 如果 threadFactory 为 null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

newWorkStealingPool

fork-join 形式的线程池

1
2
3
4
5
6
7
8
9
/**
* 创建一个线程池,该线程池维护足够的线程来支持给定的并行级别,并且可以使用多个队列来减少争用。并行级别对应于主动参与或可用于参与
* 任务处理的线程的最大数量。实际线程数可能会动态增长和收缩。work-stealing pool 不保证提交任务的执行顺序。
*
* 参数:parallelism – 目标并行度
* 返回:新创建的线程池
* 抛出:IllegalArgumentException – 如果 parallelism <= 0
*/
public static ExecutorService newWorkStealingPool(int parallelism)

newSingleThreadScheduledExecutor

执行延时或定期任务的单个线程的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 创建一个单线程执行程序(executor),可以安排命令在给定延迟后运行,或定期执行。
* (但请注意,如果该单线程在关闭前的执行过程中因失败而终止,而后续还有任务需要执行,那么新的线程将取代它。)
* 保证所有任务按顺序执行,并且在任何给定时间都不会有超过一个任务处于活动状态。
* 与其他等效的newScheduledThreadPool(1)不同,返回的executor保证不可重新配置以使用额外的线程。
*
* 返回:新创建的调度执行器
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor()

/**
* 创建一个单线程执行程序(executor),可以安排命令在给定延迟后运行,或定期执行。
* (但请注意,如果该单线程在关闭前的执行过程中因失败而终止,而后续还有任务需要执行,那么新的线程将取代它。)
* 任务所有保证按顺序执行,并且在任何给定时间都不会有超过一个任务处于活动状态。
* 与其他等效的newScheduledThreadPool(1, threadFactory)不同,返回的executor保证不能重新配置以使用其他线程。
*
* 参数:threadFactory – 创建新线程时使用的工厂
* 返回:一个新创建的调度执行器
* 抛出:NullPointerException – 如果threadFactory为空
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

newScheduledThreadPool

执行延时或定期任务的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 创建一个线程池,可以安排命令在给定延迟后运行,或定期执行。
*
* 参数: corePoolSize - 保留在池中的线程数,即使它们是空闲的
* 返回:一个新创建的调度线程池
* 抛出: IllegalArgumentException – 如果 corePoolSize < 0
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

/**
* 创建一个线程池,可以安排命令在给定延迟后运行,或定期执行。
*
* 参数: corePoolSize - 保留在池中的线程数,即使它们是空闲的
* threadFactory - 执行程序创建新线程时使用的工厂
* 返回:一个新创建的调度线程池
* 抛出: IllegalArgumentException – 如果 corePoolSize < 0
* NullPointerException – 如果 threadFactory 为 null
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

注意:这里用的是 java.util.concurrent.ScheduledExecutorService 接口的 schedule() 方法,不是 java.util.concurrent.ExecutorService 接口的 execute() 方法。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class Test {
public static void main(String[] args) throws InterruptedException {
// 创建新线程的默认线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 可用的处理器数量
int availableProcessorsNum = Runtime.getRuntime().availableProcessors();
System.out.println("可用的处理器数量: " + availableProcessorsNum);

// 创建一个单线程执行程序
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(threadFactory);
// 创建固定大小线程池,工作队列LinkedBlockingQueue
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(availableProcessorsNum, threadFactory);
// 创建缓存线程池,核心池大小0,最大线程数Integer.MAX_VALUE,当线程数大于核心池大小时空闲线程存活时间60s,工作队列SynchronousQueue,
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(threadFactory);

// 创建一个单线程执行程序,可以安排命令在给定延迟后运行,或定期执行。工作队列DelayedWorkQueue
ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
// 创建一个线程池,可以安排命令在给定延迟后运行,或定期执行。工作队列DelayedWorkQueue
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(availableProcessorsNum,
threadFactory);


// newSingleThreadScheduledExecutor是否只有一个线程工作?观察输出的线程名称,结果是一个工作线程
IntStream.range(0, 20).forEach(i -> {
ScheduledFuture<?> scheduledFuture = singleThreadScheduledExecutor.scheduleAtFixedRate(
new RunnableTask1(),
3,
5,
TimeUnit.SECONDS);
});

// newScheduledThreadPool是否固定数量?观察输出的线程名称,结果是固定数量
IntStream.range(0, 20).forEach(i -> {
ScheduledFuture<?> scheduledFuture = scheduledThreadPool.scheduleWithFixedDelay(
new RunnableTask1(),
3,
5,
TimeUnit.SECONDS);
});
}
}

class RunnableTask1 implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " RunnableTask1");
}
}

class CallableTask1 implements Callable {
@Override
public Object call() throws Exception {
System.out.println(Thread.currentThread().getName() + " CallableTask1");
TimeUnit.MICROSECONDS.sleep(500000); // 1毫秒=1000微妙
return "CallableTask1 execute success!";
}
}

ThreadPoolExecutor类-推荐

ThreadPoolExecutor 类提供了4种构造方法,可根据需要来自定义一个线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadPoolExecutor extends AbstractExecutorService {

// 其他构造方法,省略...

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}

}

共7个参数,详细说明可参考类注释

  • corePoolSize:核心线程数,线程池中始终存活的线程数。

  • maximumPoolSize:最大线程数,线程池中允许的最大线程数。

  • keepAliveTime:空闲的非核心线程存活时间(非核心线程数 = 最大线程数 - 核心线程数),非核心线程空闲的时间超过设定值会被销毁。

  • unit:参数 keepAliveTime 的时间单位。

  • workQueue:一个阻塞的任务队列,用来存储等待执行的任务,均为线程安全,7种可选。

    任务拒绝策略 描述
    ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列。
    LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列。
    SynchronousQueue 一个不存储元素的阻塞队列,即直接提交给线程不保持它们。(同步队列)
    PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列。
    DelayQueue 一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
    LinkedTransferQueue 一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
    LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。

    较常用的是 LinkedBlockingQueue 和 Synchronous 。线程池的排队策略与 BlockingQueue 有关。

  • threadFactory:线程工厂,通过 ThreadFactory 接口可自定义创建线程的行为。以下是默认的线程工厂 Executors.defaultThreadFactory()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class CustomThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    CustomThreadFactory() {
    SecurityManager s = System.getSecurityManager();
    group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); // 线程组
    namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
    String threadName = namePrefix + threadNumber.getAndIncrement(); // 自定义线程名称,方便查看日志
    Thread thread = new Thread(group, r, threadName, 0);
    if (thread.isDaemon())
    thread.setDaemon(false); // 非守护线程
    if (thread.getPriority() != Thread.NORM_PRIORITY)
    thread.setPriority(Thread.NORM_PRIORITY); // 优先级5
    return thread;
    }
    }
  • handler:任务拒绝策略(提交给线程池的任务被拒绝的处理策略),4种可选,AbortPolicy 用的最多,也可自定义。

    在什么情况下提交给线程池的任务会被拒绝呢?同时满足以下4种情况:线程池中的线程已满、无法再继续扩容、没有空闲线程、任务队列已满。

    参数 描述
    AbortPolicy 默认的拒绝策略,抛出 RejectedExecutionException 异常。
    DiscardPolicy 直接丢弃任务。
    DiscardOldestPolicy 丢弃处于任务队列头部(最旧)的任务,添加被拒绝的任务。(队列:队尾入队,队头出队,先进先出)
    CallerRunsPolicy 使用调用者的线程直接执行被拒绝的任务。

ScheduledThreadPoolExecutor类

调度线程池:具备执行定时、延时、周期性任务的线程池。

4个构造方法都无法指定任务队列,默认是专门的延迟队列 DelayedWorkQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {

// 其他构造方法,省略...

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

}

接口方法源码注释见上文 ScheduledExecutorService 接口。

执行周期性任务的两个方法的区别:

scheduleAtFixedRate scheduleWithFixedDelay
周期 固定时间 间隔时间

image-20241130230853124

示例:

1
// 已fork,见github

ForkJoinPool类

什么是 ForkJoin 框架?

ForkJoin 是一个把大任务分割成若干个小任务,直到分割的足够小,再对每个小任务得到的结果进行汇总,得到大任务结果的框架,这种方式远比用一个线程执行大任务要高效的多。

Fork:表示分割任务。Join:表示合并结果。

image-20241130231616596

ForkJoinPool 是一个采用 ForkJoin 框架的线程池

1
2
3
public class ForkJoinPool extends AbstractExecutorService {

}

相关的类:

描述
ForkJoinPool ForkJoin 线程池
ForkJoinTask 职责相当于 Future
RecursiveTask(ForkJoinTask子类) 有返回值任务(相当于Callable)
RecursiveAction(ForkJoinTask子类) 无返回值任务(相当于Runnable)

从 1 加到 100 的 ForkJoin 图解:

image-20241130233415821

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class Test {
public static void main(String[] args) {
// 创建任务
Task task = new Task(1, 100);
// 创建 ForkJoin 线程池
ForkJoinPool threadPool = new ForkJoinPool();
// 提交任务
ForkJoinTask<Integer> future = threadPool.submit(task);
try {
// 获取结果
Integer result = future.get();
// 输出结果
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
}

class Task extends RecursiveTask<Integer> {
/**
* 起始值
*/
private int start;
/**
* 结束值
*/
private int end;
/**
* 临界值
*/
private int temp = 10;

public Task(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
// 当开头与结尾之差小于临界值时
if ((end - start) < temp) {
// 记录计算结果
int sum = 0;
// 累加开头-结尾的值
for (int i = start; i <= end; i++) {
sum += i;
}
// 返回结果
return sum;
} else {
// 取中间值
int middle = (start + end) / 2;
// 计算开头-中间
Task task1 = new Task(start, middle);
// 向线程池中添加此任务
task1.fork();
// 计算中间-结尾
Task task2 = new Task(middle + 1, end);
// 向线程池中添加此任务
task2.fork();
// 合并结果
return task1.join() + task2.join();
}
}
}

线程池的使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class Test {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

System.out.println("------------------------------------------------------------");

Future<?> resultRunnableTask1 = executor.submit(new RunnableTask1());
Future<?> resultRunnableTask2 = executor.submit(new RunnableTask2(), "OK");
Future<Integer> resultCallableTask1 = executor.submit(new CallableTask1());

try {
System.out.println(resultRunnableTask1.get()); // null
System.out.println(resultRunnableTask1.get() == null); // true
System.out.println(resultRunnableTask2.get()); // OK
System.out.println(resultCallableTask1.get()); // CallableTask1 execute success!
} catch (ExecutionException e) {
e.printStackTrace();
}

System.out.println("------------------------------------------------------------");

Collection<Callable<String>> collection = new ArrayList();
collection.add(new CallableTask1());
collection.add(new CallableTask2());
List<Future<String>> futureList = executor.invokeAll(collection);
// List<Future<String>> futureList = executor.invokeAll(collection, 10, TimeUnit.MILLISECONDS);
futureList.forEach(ele -> {
try {
System.out.println("resultInvokeAll: " + ele.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});

System.out.println("------------------------------------------------------------");

try {
// String resultInvokeAny = executor.invokeAny(collection);
String resultInvokeAny = executor.invokeAny(collection, 10, TimeUnit.MILLISECONDS);
System.out.println("resultInvokeAny: " + resultInvokeAny);
} catch (ExecutionException | TimeoutException e) {
e.printStackTrace();
}

System.out.println("------------------------------------------------------------");

try {
//close pool
executor.shutdown();
// 只是为了等待一段时间,之后在finally中判断,若有未完成的则关闭
executor.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (!executor.isTerminated()) {
List<Runnable> runList = executor.shutdownNow();
System.out.println("从未开始执行的任务列表: " + runList.size());
}
}
}
}

class RunnableTask1 implements Runnable {
@Override
public void run() {
System.out.println("RunnableTask1");
}
}

class RunnableTask2 implements Runnable {
@Override
public void run() {
System.out.println("RunnableTask2");
}
}

class CallableTask1 implements Callable {
@Override
public Object call() throws Exception {
System.out.println("CallableTask1");
TimeUnit.MICROSECONDS.sleep(500000); // 1毫秒=1000微妙
return "CallableTask1 execute success!";
}
}

class CallableTask2 implements Callable {
@Override
public String call() throws Exception {
System.out.println("CallableTask2");
TimeUnit.SECONDS.sleep(1);
return "CallableTask2 execute success!";
}
}

execute和submit区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
- 归属不同接口
- 返回值类型不同
- 参数不同
*/

public interface Executor {
void execute(Runnable command);
}

public interface ExecutorService extends Executor {
/**
* Runnable任务无返回值,为什么还要返回一个Future对象呢?
* 因为Future除了获取任务执行结果以外,还可以观察任务执行状态、取消任务等操作,所以返回的Future可以选择是否接收。
* submit方法支持提交Runnable任务并指定执行结果,若未指定则默认执行结果为null。
*/
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task); // 提交Callable任务
}

Future UML 类图

image-20241130141530423

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public interface Future<V> {
/**
* 尝试取消任务
* 方法返回true,调用get将引发CancellationException。
* 若果任务未执行,则该任务永远不会运行。方法返回true。
* 若果任务正在执行,则mayInterruptIfRunning参数表示是否中断正在执行的任务。无论是否中断,此方法都返回true。
* 方法返回false
* 若果任务已完成或已取消,或者由于某些其他原因无法取消,调用该方法无效。方法返回false。
* 该方法的返回值并不一定表明任务现在是否被标记为取消,应该使用isCancelled进行判断(注意:任务取消并不代表任务结束执行)。
*
* 参数:mayInterruptIfRunning – 表示是否中断正在执行的任务。
* true :任务被标记为取消。若任务中响应线程中断指令Thread.interrupted(),则任务被中断;否则继续执行。所以为true时任务不一定被中断。
* false:任务被标记为取消。但任务依然会继续执行直到结束。
* 返回:如果任务无法取消,则false,通常是因为任务已经完成;否则为true。
* 如果两个或多个线程导致任务被取消,则至少其中一个返回true。实施可以提供更有力的保证。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 阻塞式获取任务执行结果
* 抛出: CancellationException – 如果任务被取消
* ExecutionException – 如果任务抛出异常
* InterruptedException – 如果当前线程在等待时被中断
*
* 注意: 这里的InterruptedException不是任务抛出的,而是调用get()方法的线程。Future没有Thread任务中的InterruptedException,
* 只有CancellationException。
*/
V get() throws InterruptedException, ExecutionException;
/**
* 在指定时间内阻塞式获取任务执行结果,若超时则抛出异常
* 抛出:CancellationException – 如果任务被取消
* ExecutionException – 如果任务抛出异常
* InterruptedException – 如果当前线程在等待时被中断
* TimeoutException – 如果等待超时
*
* 注意: 这里的InterruptedException不是任务抛出的,而是调用get()方法的线程。Future没有Thread任务中的InterruptedException,
* 只有CancellationException。
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
/**
* 如果此任务在正常完成之前被取消,则返回true。
*/
boolean isCancelled();
/**
* 如果此任务完成则返回true。完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法都将返回true。
*/
boolean isDone();
}

线程池的关闭和运行状态

当使用完成 ExecutorService 后应该关闭它,否则它里面的线程会一直处于运行状态(即使没有提交任何任务),导致JVM无法关闭。举个例子,如果的应用程序是通过 main() 方法启动的,在这个 main() 退出之后,如果应用程序中的 ExecutorService 没有关闭,系统将一直运行。

两种关闭线程池的方法对比,实际开发中大多使用 shutdown()

ExecutorService.shutdown() ExecutorService.shutdownNow()
立即关闭线程池 false true
延时关闭线程池 true false
不再接收新任务(任务拒绝策略) true true
继续正在执行的任务 true 中断能够响应interrupt的任务
执行队列中的任务 true false
返回队列中的任务列表(未执行任务) false true
线程池状态 SHUTDOWN STOP

线程池的状态,定义在 ThreadPoolExecutor 类中,包装了两个概念字段:

1
2
3
4
5
6
7
8
9
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
  • workerCount:表示池中的线程数(或者叫工作线程数)。

  • runState:表示线程池的运行状态,提供主要的生命周期控制。

    运行状态 描述
    RUNNING 接受新任务,并处理排队任务
    SHUTDOWN 不接受新任务,但处理排队任务,继续正在执行的任务。当所有任务都执行完,转为TIDYING状态。
    STOP 不接受新任务,不处理排队任务,并中断正在进行的任务。当正在执行的任务被标记为中断状态后,转为TIDYING状态。
    TIDYING 所有任务都已终止,workerCount为零,转换到TIDYING状态的线程将运行termination()钩子方法。
    TERMINATED 线程池彻底关闭

runState 的状态转换,具体见 ThreadPoolExecutor 类:

image-20241130192011239

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Test {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();

IntStream.range(0, 100).forEach(i -> executor.execute(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("finished: " + threadName + "\t" + i);
while (true) {
if (Thread.interrupted()) {
System.out.println("线程被中断");
break;
}
}
}));

try {
// close pool
executor.shutdown();

System.out.println("线程池状态是否 >= SHUTDOWN: " + executor.isShutdown()); // true
System.out.println("线程池状态是否 >= TERMINATED: " + executor.isTerminated()); // false
System.out.println("线程池状态是否 >= TERMINATED: " + executor.awaitTermination(3, TimeUnit.SECONDS)); // false
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (!executor.isTerminated()) {
List<Runnable> runList = executor.shutdownNow();
System.out.println("从未开始执行的任务列表: " + runList.size());
}
// Thread.sleep(3000);

System.out.println("线程池状态是否 >= SHUTDOWN: " + executor.isShutdown()); // true
System.out.println("线程池状态是否 >= TERMINATED: " + executor.isTerminated()); // true
System.out.println("线程池状态是否 >= TERMINATED: " + executor.awaitTermination(3, TimeUnit.SECONDS)); // true
}
}
}

线程池是怎样执行任务的?

提交任务的方式有两种:

  • ThreadPoolExecutor 类实现的 Executor 接口的 void execute(Runnable command); 方法。
  • AbstractExecutorService 抽象类实现的 ExecutorService 接口的 Future<?> submit(Runnable task); 方法。

它们最终处理任务的方法都是 ThreadPoolExecutor 类中实现的 execute 方法。

submit 方法的实现源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public abstract class AbstractExecutorService implements ExecutorService {
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); // 转为有返回值任务
execute(ftask); // 此处调用的是 ThreadPoolExecutor 类中实现的 execute 方法
return ftask; // 返回 Future 对象,调用者根据该对象进行 获取任务执行结果、观察任务执行状态、取消任务 等操作
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result); // 转为有返回值任务
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}

execute 方法的实现源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 核心线程是否已满(workerCountOf 获取当前线程池中有多少线程)
// addWorker方法添加线程,使用了CAS操作保证线程安全
// 第一个参数:指定线程任务,在线程刚创建时就可以指定任务
// 第二个参数:指定该线程是否为核心线程,true代表是核心线程。核心线程不会在空闲时被销毁,单独设置允许销毁核心线程除外
if (addWorker(command, true))
return; // 添加成功就会执行任务,直接return。若核心线程已满,则添加失败。
c = ctl.get(); // 获取线程池状态和池中线程数,为后续流程做准备。
}
if (isRunning(c) && workQueue.offer(command)) { // 当核心线程已满时,如果线程池还在运行,则将任务添加至任务队列中(队列未满则添加成功)
int recheck = ctl.get(); // 再次获取线程池状态
if (! isRunning(recheck) && remove(command)) // 线程池如果不再运行,则从队列移除任务
reject(command); // 移除成功,则拒绝任务
else if (workerCountOf(recheck) == 0) // 池中可用线程数是否为零
addWorker(null, false); // 池中没有可用线程,则添加空任务的非核心线程
} else if (!addWorker(command, false)) // 当核心线程已满,且任务队列已满时,再次尝试添加线程,达到最大线程数则添加失败
reject(command); // 核心线程已满,任务队列已满,且达到最大线程数,则拒绝任务
}
}

流程图:

image-20241130220905236

CompletionService

如果需求是按任务完成时间先后顺序返回执行结果时,推荐使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class Test {
public static void main(String[] args) {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// 创建 ExecutorCompletionService 对象
ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(threadPool);
// 提交多个任务
for (int i = 5; i >= 1; i--) {
// 创建任务
Task task = new Task(i);
// 提交任务
completionService.submit(task);
}
try {
// 获取多个任务执行结果
for (int i = 0; i < 5; i++) {
// 获取 Future
Future<Integer> future = completionService.take();
// 获取执行结果
int result = future.get();
// 输出执行结果
System.out.println("返回顺序:" + result);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
}

class Task implements Callable<Integer> {
/**
* 执行时间
*/
private int order;

public Task(int order) {
this.order = order;
}

@Override
public Integer call() throws Exception {
// 输出执行时间
System.out.println("执行顺序:" + order);
// 使当前线程休眠指定时间
Thread.sleep(order * 1000L);
// 返回执行时间
return order;
}
}

如何监控线程池?

自定义带监控功能的线程池,继承 ThreadPoolExecutor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class MonitorThreadPool extends ThreadPoolExecutor {

public MonitorThreadPool(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

/**
* 每次执行任务前调用
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
monitor("beforeExecute");
}

/**
* 每次任务完成后调用
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
monitor("afterExecute");
}

/**
* 线程池关闭前调用
*/
@Override
protected void terminated() {
monitor("terminated ");
}

/**
* 监控线程池情况
* 实际开发中,一般写到log,或写到服务器某块儿地方,通过客户端随时查看
*/
public void monitor(String ExecutePos) {
System.out.print(Thread.currentThread().getName() + "\t"); // 线程名称
System.out.print(ExecutePos + "\t"); // 执行时机
// 监控线程的变化情况
System.out.print("正在工作线程数:" + getActiveCount() + "\t");
System.out.print("当前存在线程数:" + getPoolSize() + "\t");
System.out.print("历史最大线程数:" + getLargestPoolSize() + "\t");
// 监控任务的变化情况
System.out.print("已提交任务数:" + getTaskCount() + "\t");
System.out.print("已完成任务数:" + getCompletedTaskCount() + "\t");
System.out.println("队列中任务数:" + getQueue().size());
}
}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class Test {
public static void main(String[] args) {
Thread.currentThread().setName(Thread.currentThread().getName() + " "); // 为了让控制台输出对齐
// 创建带监控的线程池
MonitorThreadPool threadPool =
new MonitorThreadPool(1, 3, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new AbortPolicy());
try {
// 提交多个任务
for (int i = 5; i > 0; i--) {
// 创建任务
Task task = new Task(i);
// 提交任务
threadPool.submit(task);
// 每隔500毫秒提交一个
Thread.sleep(500);
}
// 使主线程休眠6秒钟,目的是在关闭线程池之前获取一次情况
Thread.sleep(6000);
threadPool.monitor("beforeShutdown");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
}

class Task implements Runnable {
/**
* 执行时间
*/
private int timeout;

public Task(int timeout) {
this.timeout = timeout;
}

@Override
public void run() {
try {
// 使当前线程休眠指定时间
Thread.sleep(timeout * 1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
pool-1-thread-1	beforeExecute	正在工作线程数:1	当前存在线程数:1	历史最大线程数:1	已提交任务数:1	已完成任务数:0	队列中任务数:0
pool-1-thread-2 beforeExecute 正在工作线程数:2 当前存在线程数:2 历史最大线程数:2 已提交任务数:4 已完成任务数:0 队列中任务数:2
pool-1-thread-3 beforeExecute 正在工作线程数:3 当前存在线程数:3 历史最大线程数:3 已提交任务数:5 已完成任务数:0 队列中任务数:2
pool-1-thread-3 afterExecute 正在工作线程数:3 当前存在线程数:3 历史最大线程数:3 已提交任务数:5 已完成任务数:0 队列中任务数:2
pool-1-thread-3 beforeExecute 正在工作线程数:3 当前存在线程数:3 历史最大线程数:3 已提交任务数:5 已完成任务数:1 队列中任务数:1
pool-1-thread-2 afterExecute 正在工作线程数:3 当前存在线程数:3 历史最大线程数:3 已提交任务数:5 已完成任务数:1 队列中任务数:1
pool-1-thread-2 beforeExecute 正在工作线程数:3 当前存在线程数:3 历史最大线程数:3 已提交任务数:5 已完成任务数:2 队列中任务数:0
pool-1-thread-1 afterExecute 正在工作线程数:3 当前存在线程数:3 历史最大线程数:3 已提交任务数:5 已完成任务数:2 队列中任务数:0
pool-1-thread-2 afterExecute 正在工作线程数:2 当前存在线程数:2 历史最大线程数:3 已提交任务数:5 已完成任务数:3 队列中任务数:0
pool-1-thread-3 afterExecute 正在工作线程数:1 当前存在线程数:1 历史最大线程数:3 已提交任务数:5 已完成任务数:4 队列中任务数:0
main beforeShutdown 正在工作线程数:0 当前存在线程数:1 历史最大线程数:3 已提交任务数:5 已完成任务数:5 队列中任务数:0
pool-1-thread-3 terminated 正在工作线程数:0 当前存在线程数:0 历史最大线程数:3 已提交任务数:5 已完成任务数:5 队列中任务数:0