推荐:
视频教程:https://www.bilibili.com/video/BV1wh411e7nd
对应代码:https://github.com/hellozhaolq/2021-Java-ThreadPool-Tutorial
对于数据库连接,我们经常听到数据库连接池 这个概念。因为建立数据库连接是非常耗时的一个操作,其中涉及到网络IO的一些操作。因此就想出把连接通过一个连接池来管理。需要连接的话,就从连接池里取一个。当使用完了,就“关闭”连接,这不是真正意义上的关闭,只是把连接放回池里,供其他人在使用。所以对于线程,也有了线程池 这个概念,其中的原理和数据库连接池差不多。
程序启动一个新线程成本是比较高的,因为它涉及到要与操作系统进行交互。而使用线程池可以很好的提高性能,尤其是当程序中要创建大量生存期很短的线程时,更应该考虑使用线程池。线程池里的每一个线程代码结束后,并不会死亡,而是再次回到线程池中成为空闲状态,等待下一个对象来使用。
在 JDK5 之前,我们必须手动实现自己的线程池,从JDK5开始,Java内置支持线程池。
作用 线程池就是限制系统中使用线程的数量以及更好的使用线程
根据系统的运行情况,可以自动或手动设置线程数量,达到运行的最佳效果:配置少了,将影响系统的执行效率,配置多了,又会浪费系统的资源。用线程池配置数量,其他线程排队等候。当一个任务执行完毕后,就从队列中取一个新任务运行,如果没有新任务,那么这个线程将等待。如果来了一个新任务,但是没有空闲线程的话,那么把任务加入到等待队列中。
为什么要用线程池?
减少线程创建和销毁的次数,使线程可以多次复用
可以根据系统情况,调整线程的数量。防止创建过多的线程,消耗过多的内存(每个线程1M左右)
线程池 UML 类图
Executor Java 里面线程池的顶级接口是 java.util.concurrent.Executor
,但是严格意义上讲 Executor 并不是一个线程池,它只是一个执行线程的工具。
真正的线程池接口是 java.util.concurrent.ExecutorService
。
1 2 3 4 5 6 7 8 9 10 public interface Executor { void execute (Runnable command) ; }
ExecutorService ExecutorService 对象表示一个线程池,可以执行 Runnable 对象或者 Callable 对象代表的线程。
全部接口方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 public interface ExecutorService extends Executor { void shutdown () ; List<Runnable> shutdownNow () ; boolean isShutdown () ; boolean isTerminated () ; boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException; Future<?> submit(Runnable task); <T> Future<T> submit (Runnable task, T result) ; <T> Future<T> submit (Callable<T> task) ; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
AbstractExecutorService 提供 ExecutorService 执行方法的默认实现。该类使用 newTaskFor(返回 RunnableFuture)实现了 submit 、 invokeAny 和 invokeAll 方法,默认为 java.util.concurrent
包中提供的 FutureTask 类。例如, submit(Runnable) 的实现创建一个执行并返回的关联 RunnableFuture 。子类可以重写 newTaskFor方法以返回除 FutureTask 之外的 RunnableFuture 实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor (Runnable runnable, T value) { return new FutureTask <T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor (Callable<T> callable) { return new FutureTask <T>(callable); } }
ScheduledExecutorService 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public interface ScheduledExecutorService extends ExecutorService { public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule (Callable<V> callable, long delay, TimeUnit unit) ; public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
线程池的创建 https://www.cnblogs.com/pcheng/p/13540619.html
https://www.cnblogs.com/vhua/tag/%E5%B9%B6%E5%8F%91/
Java 中创建线程池主要有两种方法:
通过 Executors 工厂类创建,该类提供了4种不同的线程池可供使用。
通过 ThreadPoolExecutor 类进行自定义创建, 推荐 。
Executors工厂类-不推荐 java.util.concurrent.Executors
类提供了 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。该类支持以下几种方法:
创建并返回常用配置的 ExecutorService 的方法。
创建并返回常用配置的 ScheduledExecutorService 的方法。
创建并返回“包装的”ExecutorService 的方法,该方法通过使特定于实现的方法不可访问来禁用重新配置。
创建并返回ThreadFactory的方法,该 ThreadFactory 将新创建的线程设置为已知状态。
从其他类似闭包的形式创建并返回Callable方法,因此它们可以在需要Callable执行方法中使用。
创建线程工厂 Executors 提供两个线程工厂(ThreadFactory接口)的实现类及工厂方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public static ThreadFactory defaultThreadFactory () public static ThreadFactory privilegedThreadFactory ()
newFixedThreadPool-慎用 固定大小的线程池,允许的请求队列 LinkedBlockingQueue 容量为 Integer.MAX_VALUE
( 2^31≈21亿5千万),可能会堆积大量的请求,从而导致OOM。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static ExecutorService newFixedThreadPool (int nThreads) public static ExecutorService newFixedThreadPool (int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(), threadFactory); }
newSingleThreadExecutor-慎用 单个线程的线程池,允许的请求队列 LinkedBlockingQueue 容量为 Integer.MAX_VALUE
( 2^31≈21亿5千万),可能会堆积大量的请求,从而导致OOM。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static ExecutorService newSingleThreadExecutor () public static ExecutorService newSingleThreadExecutor (ThreadFactory threadFactory) { return new AutoShutdownDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(), threadFactory)); }
newCachedThreadPool-慎用 可缓存的线程池,使用 SynchronousQueue,允许创建的线程数量为 Integer.MAX_VALUE
( 2^31≈21亿5千万),可能会创建大量的线程,从而导致OOM。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static ExecutorService newCachedThreadPool () public static ExecutorService newCachedThreadPool (ThreadFactory threadFactory) { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>(), threadFactory); }
newWorkStealingPool fork-join 形式的线程池
1 2 3 4 5 6 7 8 9 public static ExecutorService newWorkStealingPool (int parallelism)
newSingleThreadScheduledExecutor 执行延时或定期任务的单个线程的线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static ScheduledExecutorService newSingleThreadScheduledExecutor () public static ScheduledExecutorService newSingleThreadScheduledExecutor (ThreadFactory threadFactory)
newScheduledThreadPool 执行延时或定期任务的线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize, ThreadFactory threadFactory)
注意:这里用的是 java.util.concurrent.ScheduledExecutorService
接口的 schedule() 方法,不是 java.util.concurrent.ExecutorService
接口的 execute() 方法。
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class Test { public static void main (String[] args) throws InterruptedException { ThreadFactory threadFactory = Executors.defaultThreadFactory(); int availableProcessorsNum = Runtime.getRuntime().availableProcessors(); System.out.println("可用的处理器数量: " + availableProcessorsNum); ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(threadFactory); ExecutorService fixedThreadPool = Executors.newFixedThreadPool(availableProcessorsNum, threadFactory); ExecutorService cachedThreadPool = Executors.newCachedThreadPool(threadFactory); ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(availableProcessorsNum, threadFactory); IntStream.range(0 , 20 ).forEach(i -> { ScheduledFuture<?> scheduledFuture = singleThreadScheduledExecutor.scheduleAtFixedRate( new RunnableTask1 (), 3 , 5 , TimeUnit.SECONDS); }); IntStream.range(0 , 20 ).forEach(i -> { ScheduledFuture<?> scheduledFuture = scheduledThreadPool.scheduleWithFixedDelay( new RunnableTask1 (), 3 , 5 , TimeUnit.SECONDS); }); } } class RunnableTask1 implements Runnable { @Override public void run () { System.out.println(Thread.currentThread().getName() + " RunnableTask1" ); } } class CallableTask1 implements Callable { @Override public Object call () throws Exception { System.out.println(Thread.currentThread().getName() + " CallableTask1" ); TimeUnit.MICROSECONDS.sleep(500000 ); return "CallableTask1 execute success!" ; } }
ThreadPoolExecutor类-推荐 ThreadPoolExecutor 类提供了4种构造方法,可根据需要来自定义一个线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {} }
共7个参数,详细说明可参考类注释 :
corePoolSize:核心线程数,线程池中始终存活的线程数。
maximumPoolSize:最大线程数,线程池中允许的最大线程数。
keepAliveTime:空闲的非核心线程存活时间(非核心线程数 = 最大线程数 - 核心线程数),非核心线程空闲的时间超过设定值会被销毁。
unit:参数 keepAliveTime 的时间单位。
workQueue:一个阻塞的任务队列,用来存储等待执行的任务,均为线程安全,7种可选。
任务拒绝策略
描述
ArrayBlockingQueue
一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue
一个由链表结构组成的有界阻塞队列。
SynchronousQueue
一个不存储元素的阻塞队列,即直接提交给线程不保持它们。(同步队列)
PriorityBlockingQueue
一个支持优先级排序的无界阻塞队列。
DelayQueue
一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
LinkedTransferQueue
一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
LinkedBlockingDeque
一个由链表结构组成的双向阻塞队列。
较常用的是 LinkedBlockingQueue 和 Synchronous 。线程池的排队策略与 BlockingQueue 有关。
threadFactory:线程工厂,通过 ThreadFactory 接口可自定义创建线程的行为。以下是默认的线程工厂 Executors.defaultThreadFactory()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class CustomThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger (1 ); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger (1 ); private final String namePrefix; CustomThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null ) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" ; } @Override public Thread newThread (Runnable r) { String threadName = namePrefix + threadNumber.getAndIncrement(); Thread thread = new Thread (group, r, threadName, 0 ); if (thread.isDaemon()) thread.setDaemon(false ); if (thread.getPriority() != Thread.NORM_PRIORITY) thread.setPriority(Thread.NORM_PRIORITY); return thread; } }
handler:任务拒绝策略(提交给线程池的任务被拒绝的处理策略),4种可选,AbortPolicy 用的最多,也可自定义。
在什么情况下提交给线程池的任务会被拒绝呢?同时满足以下4种情况:线程池中的线程已满、无法再继续扩容、没有空闲线程、任务队列已满。
参数
描述
AbortPolicy
默认的拒绝策略,抛出 RejectedExecutionException 异常。
DiscardPolicy
直接丢弃任务。
DiscardOldestPolicy
丢弃处于任务队列头部(最旧)的任务,添加被拒绝的任务。(队列:队尾入队,队头出队,先进先出)
CallerRunsPolicy
使用调用者的线程直接执行被拒绝的任务。
ScheduledThreadPoolExecutor类 调度线程池: 具备执行定时、延时、周期性任务的线程池。
4个构造方法都无法指定任务队列,默认是专门的延迟队列 DelayedWorkQueue
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { public ScheduledThreadPoolExecutor (int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super (corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue (), threadFactory, handler); } }
接口方法源码注释见上文 ScheduledExecutorService
接口。
执行周期性任务的两个方法的区别:
scheduleAtFixedRate
scheduleWithFixedDelay
周期
固定时间
间隔时间
示例:
ForkJoinPool类 什么是 ForkJoin 框架?
ForkJoin 是一个把大任务分割成若干个小任务 ,直到分割的足够小,再对每个小任务得到的结果进行汇总 ,得到大任务结果的框架,这种方式远比用一个线程执行大任务要高效的多。
Fork:表示分割任务。Join:表示合并结果。
ForkJoinPool 是一个采用 ForkJoin 框架的线程池
1 2 3 public class ForkJoinPool extends AbstractExecutorService { }
相关的类:
类
描述
ForkJoinPool
ForkJoin 线程池
ForkJoinTask
职责相当于 Future
RecursiveTask(ForkJoinTask子类)
有返回值任务(相当于Callable)
RecursiveAction(ForkJoinTask子类)
无返回值任务(相当于Runnable)
从 1 加到 100 的 ForkJoin 图解:
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public class Test { public static void main (String[] args) { Task task = new Task (1 , 100 ); ForkJoinPool threadPool = new ForkJoinPool (); ForkJoinTask<Integer> future = threadPool.submit(task); try { Integer result = future.get(); System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } } class Task extends RecursiveTask <Integer> { private int start; private int end; private int temp = 10 ; public Task (int start, int end) { this .start = start; this .end = end; } @Override protected Integer compute () { if ((end - start) < temp) { int sum = 0 ; for (int i = start; i <= end; i++) { sum += i; } return sum; } else { int middle = (start + end) / 2 ; Task task1 = new Task (start, middle); task1.fork(); Task task2 = new Task (middle + 1 , end); task2.fork(); return task1.join() + task2.join(); } } }
线程池的使用示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 public class Test { public static void main (String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); System.out.println("------------------------------------------------------------" ); Future<?> resultRunnableTask1 = executor.submit(new RunnableTask1 ()); Future<?> resultRunnableTask2 = executor.submit(new RunnableTask2 (), "OK" ); Future<Integer> resultCallableTask1 = executor.submit(new CallableTask1 ()); try { System.out.println(resultRunnableTask1.get()); System.out.println(resultRunnableTask1.get() == null ); System.out.println(resultRunnableTask2.get()); System.out.println(resultCallableTask1.get()); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("------------------------------------------------------------" ); Collection<Callable<String>> collection = new ArrayList (); collection.add(new CallableTask1 ()); collection.add(new CallableTask2 ()); List<Future<String>> futureList = executor.invokeAll(collection); futureList.forEach(ele -> { try { System.out.println("resultInvokeAll: " + ele.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); System.out.println("------------------------------------------------------------" ); try { String resultInvokeAny = executor.invokeAny(collection, 10 , TimeUnit.MILLISECONDS); System.out.println("resultInvokeAny: " + resultInvokeAny); } catch (ExecutionException | TimeoutException e) { e.printStackTrace(); } System.out.println("------------------------------------------------------------" ); try { executor.shutdown(); executor.awaitTermination(3 , TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (!executor.isTerminated()) { List<Runnable> runList = executor.shutdownNow(); System.out.println("从未开始执行的任务列表: " + runList.size()); } } } } class RunnableTask1 implements Runnable { @Override public void run () { System.out.println("RunnableTask1" ); } } class RunnableTask2 implements Runnable { @Override public void run () { System.out.println("RunnableTask2" ); } } class CallableTask1 implements Callable { @Override public Object call () throws Exception { System.out.println("CallableTask1" ); TimeUnit.MICROSECONDS.sleep(500000 ); return "CallableTask1 execute success!" ; } } class CallableTask2 implements Callable { @Override public String call () throws Exception { System.out.println("CallableTask2" ); TimeUnit.SECONDS.sleep(1 ); return "CallableTask2 execute success!" ; } }
execute和submit区别 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public interface Executor { void execute (Runnable command) ; } public interface ExecutorService extends Executor { Future<?> submit(Runnable task); <T> Future<T> submit (Runnable task, T result) ; <T> Future<T> submit (Callable<T> task) ; }
Future UML 类图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public interface Future <V> { boolean cancel (boolean mayInterruptIfRunning) ; V get () throws InterruptedException, ExecutionException; V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; boolean isCancelled () ; boolean isDone () ; }
线程池的关闭和运行状态 当使用完成 ExecutorService 后应该关闭它,否则它里面的线程会一直处于运行状态(即使没有提交任何任务),导致JVM无法关闭。举个例子,如果的应用程序是通过 main() 方法启动的,在这个 main() 退出之后,如果应用程序中的 ExecutorService 没有关闭,系统将一直运行。
两种关闭线程池的方法对比,实际开发中大多使用 shutdown() :
ExecutorService.shutdown()
ExecutorService.shutdownNow()
立即关闭线程池
false
true
延时关闭线程池
true
false
不再接收新任务(任务拒绝策略)
true
true
继续正在执行的任务
true
中断能够响应interrupt的任务
执行队列中的任务
true
false
返回队列中的任务列表(未执行任务)
false
true
线程池状态
SHUTDOWN
STOP
线程池的状态,定义在 ThreadPoolExecutor 类中,包装了两个概念字段:
1 2 3 4 5 6 7 8 9 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
runState 的状态转换,具体见 ThreadPoolExecutor 类:
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class Test { public static void main (String[] args) throws InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); IntStream.range(0 , 100 ).forEach(i -> executor.execute(() -> { String threadName = Thread.currentThread().getName(); System.out.println("finished: " + threadName + "\t" + i); while (true ) { if (Thread.interrupted()) { System.out.println("线程被中断" ); break ; } } })); try { executor.shutdown(); System.out.println("线程池状态是否 >= SHUTDOWN: " + executor.isShutdown()); System.out.println("线程池状态是否 >= TERMINATED: " + executor.isTerminated()); System.out.println("线程池状态是否 >= TERMINATED: " + executor.awaitTermination(3 , TimeUnit.SECONDS)); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (!executor.isTerminated()) { List<Runnable> runList = executor.shutdownNow(); System.out.println("从未开始执行的任务列表: " + runList.size()); } System.out.println("线程池状态是否 >= SHUTDOWN: " + executor.isShutdown()); System.out.println("线程池状态是否 >= TERMINATED: " + executor.isTerminated()); System.out.println("线程池状态是否 >= TERMINATED: " + executor.awaitTermination(3 , TimeUnit.SECONDS)); } } }
线程池是怎样执行任务的? 提交任务的方式有两种:
ThreadPoolExecutor 类实现的 Executor 接口的 void execute(Runnable command);
方法。
AbstractExecutorService 抽象类实现的 ExecutorService 接口的 Future<?> submit(Runnable task);
方法。
它们最终处理任务的方法都是 ThreadPoolExecutor 类中实现的 execute 方法。
submit 方法的实现源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public abstract class AbstractExecutorService implements ExecutorService { public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException (); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; } public <T> Future<T> submit (Runnable task, T result) { if (task == null ) throw new NullPointerException (); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit (Callable<T> task) { if (task == null ) throw new NullPointerException (); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } }
execute 方法的实现源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class ThreadPoolExecutor extends AbstractExecutorService { public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); } }
流程图:
CompletionService 如果需求是按任务完成时间先后顺序返回执行结果时,推荐使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class Test { public static void main (String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(5 ); ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService <>(threadPool); for (int i = 5 ; i >= 1 ; i--) { Task task = new Task (i); completionService.submit(task); } try { for (int i = 0 ; i < 5 ; i++) { Future<Integer> future = completionService.take(); int result = future.get(); System.out.println("返回顺序:" + result); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } } class Task implements Callable <Integer> { private int order; public Task (int order) { this .order = order; } @Override public Integer call () throws Exception { System.out.println("执行顺序:" + order); Thread.sleep(order * 1000L ); return order; } }
如何监控线程池? 自定义带监控功能的线程池,继承 ThreadPoolExecutor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class MonitorThreadPool extends ThreadPoolExecutor { public MonitorThreadPool (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } @Override protected void beforeExecute (Thread t, Runnable r) { monitor("beforeExecute" ); } @Override protected void afterExecute (Runnable r, Throwable t) { monitor("afterExecute" ); } @Override protected void terminated () { monitor("terminated " ); } public void monitor (String ExecutePos) { System.out.print(Thread.currentThread().getName() + "\t" ); System.out.print(ExecutePos + "\t" ); System.out.print("正在工作线程数:" + getActiveCount() + "\t" ); System.out.print("当前存在线程数:" + getPoolSize() + "\t" ); System.out.print("历史最大线程数:" + getLargestPoolSize() + "\t" ); System.out.print("已提交任务数:" + getTaskCount() + "\t" ); System.out.print("已完成任务数:" + getCompletedTaskCount() + "\t" ); System.out.println("队列中任务数:" + getQueue().size()); } }
测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class Test { public static void main (String[] args) { Thread.currentThread().setName(Thread.currentThread().getName() + " " ); MonitorThreadPool threadPool = new MonitorThreadPool (1 , 3 , 0 , TimeUnit.SECONDS, new LinkedBlockingQueue <>(2 ), new AbortPolicy ()); try { for (int i = 5 ; i > 0 ; i--) { Task task = new Task (i); threadPool.submit(task); Thread.sleep(500 ); } Thread.sleep(6000 ); threadPool.monitor("beforeShutdown" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } } class Task implements Runnable { private int timeout; public Task (int timeout) { this .timeout = timeout; } @Override public void run () { try { Thread.sleep(timeout * 1000L ); } catch (InterruptedException e) { e.printStackTrace(); } } }
输出:
1 2 3 4 5 6 7 8 9 10 11 12 pool-1-thread-1 beforeExecute 正在工作线程数:1 当前存在线程数:1 历史最大线程数:1 已提交任务数:1 已完成任务数:0 队列中任务数:0 pool-1-thread-2 beforeExecute 正在工作线程数:2 当前存在线程数:2 历史最大线程数:2 已提交任务数:4 已完成任务数:0 队列中任务数:2 pool-1-thread-3 beforeExecute 正在工作线程数:3 当前存在线程数:3 历史最大线程数:3 已提交任务数:5 已完成任务数:0 队列中任务数:2 pool-1-thread-3 afterExecute 正在工作线程数:3 当前存在线程数:3 历史最大线程数:3 已提交任务数:5 已完成任务数:0 队列中任务数:2 pool-1-thread-3 beforeExecute 正在工作线程数:3 当前存在线程数:3 历史最大线程数:3 已提交任务数:5 已完成任务数:1 队列中任务数:1 pool-1-thread-2 afterExecute 正在工作线程数:3 当前存在线程数:3 历史最大线程数:3 已提交任务数:5 已完成任务数:1 队列中任务数:1 pool-1-thread-2 beforeExecute 正在工作线程数:3 当前存在线程数:3 历史最大线程数:3 已提交任务数:5 已完成任务数:2 队列中任务数:0 pool-1-thread-1 afterExecute 正在工作线程数:3 当前存在线程数:3 历史最大线程数:3 已提交任务数:5 已完成任务数:2 队列中任务数:0 pool-1-thread-2 afterExecute 正在工作线程数:2 当前存在线程数:2 历史最大线程数:3 已提交任务数:5 已完成任务数:3 队列中任务数:0 pool-1-thread-3 afterExecute 正在工作线程数:1 当前存在线程数:1 历史最大线程数:3 已提交任务数:5 已完成任务数:4 队列中任务数:0 main beforeShutdown 正在工作线程数:0 当前存在线程数:1 历史最大线程数:3 已提交任务数:5 已完成任务数:5 队列中任务数:0 pool-1-thread-3 terminated 正在工作线程数:0 当前存在线程数:0 历史最大线程数:3 已提交任务数:5 已完成任务数:5 队列中任务数:0