Spring中@Async和@Scheduled注解、自定义线程池

简介

Spring中提供了 @Async 注解用于执行异步任务,而 @Schedule 注解用于执行定时任务,两个都有默认的线程池配置,也可以通过配置文件或者自定Bean的方式来更改默认配置。

想要使用 @Async@Scheduled 注解,需要在配置类、项目启动类、controller类、service类或其他由spring容器管理的bean类上添加注解: @EnableAsync@EnableScheduling

自定义异步任务线程池

配置文件

在 SpringBoot 中,可以在配置文件中指定线程池的配置,具体配置如下:

线程池 bean 的默认名称为 taskExecutor

1
2
3
4
5
6
7
8
9
10
11
spring:
task:
# ThreadPoolTaskExecutor配置(@EnableAsync注解开启)TaskExecutionProperties
execution:
pool:
core-size: 10 # 默认值8,核心线程数量,线程池创建时候初始化的线程数
max-size: 20 # 默认值2147483647,最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
queue-capacity: 1000 # 默认值2147483647,缓冲队列容量,用来缓冲执行任务的队列
keep-alive: 60s # 默认值60s,允许线程的空闲时间,超过核心线程数之外的线程在空闲时间到达后会被销毁
allow-core-thread-timeout: true # 默认值true,是否允许核心线程超时,可以实现线程池的动态增长和萎缩
thread-name-prefix: taskExecutor- # 默认值task-,线程池中TaskExecutor线程名的前缀,可以方便我们定位处理任务所在的线程池

实现AsyncConfigurer接口

另外一种方式是实现 AsyncConfigurer 接口,在方法 getAsyncExecutor 中,返回自定义的线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public TaskExecutor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize(); // 此方法必须调用,否则会抛出ThreadPoolTaskExecutor未初始化异常
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(60);
executor.setAllowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("taskExecutor-");
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncExceptionHandler();
}
}

自定义TaskExecutor

可以使用自定义 bean 的方式,来自定义 @Async 注解使用的线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@EnableAsync
@Configuration
class TaskPoolConfig {
// 若不指定bean名称,默认为方法名
@Bean("taskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize(); // 此方法必须调用,否则会抛出ThreadPoolTaskExecutor未初始化异常
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(60);
executor.setAllowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("taskExecutor-");
return executor;
}
}

@Async注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 将方法标记为异步执行候选的注解。可以在类级别使用,意味着该类中的所有方法都是异步的。
* 但请注意,@Async 不支持在 @Configuration 注解类中声明的方法。
*
* 在目标方法签名方面,支持任何参数类型,但返回类型被限制为 void 或 java.util.concurrent.Future。在后一种情况下,您可以声明更具
* 体的 org.springframework.util.concurrent.ListenableFuture 或 java.util.concurrent.CompletableFuture 类型,它
* 们允许与异步任务进行更丰富的交互并通过进一步的处理步骤进行即时组合。
*
* 从代理返回的 Future 将是一个实际的异步 Future,可用于跟踪异步方法执行的结果。但是,由于目标方法需要实现相同的签名,因此它必须返回一个临
* 时的 Future,该句柄只是传递一个值:例如Spring 的 AsyncResult、EJB 3.1 的 javax.ejb.AsyncResult 或 java.util.concurrent.CompletableFuture.completedFuture(Object)。
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {

/**
* 指定异步操作的限定符值。
* 可用于确定执行异步操作时要使用的目标执行器,匹配特定 Executor 或 TaskExecutor bean 定义的限定符值(或 bean 名称)。
* 当在类级别的 @Async 注释上指定时,表示给定的执行器应该用于类中的所有方法。 Async#value 的方法级别使用始终覆盖在类级别设置的任何值。
*/
String value() default "";

}

注意:

  • @Async 注解默认使用 bean 名称为 taskExecutor 的线程池,允许指定自定义的线程池。
  • 在同一个类中,如果一个方法(包括 @Async 方法)调用另外一个 @Async 方法,第二个方法不会异步执行,这就是代理绕过问题
  • @Async 注解所在类必须由 spring 容器管理,调用 @Async 方法的实例必须是 spring 容器中的实例,否则不会异步。手动 new 的对象调用不会异步。

示例

Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
@RestController
public class DemoController {
@Autowired
private AsyncService asyncService;

@RequestMapping(value = "/asyncTask", method = RequestMethod.GET)
public String asyncTask(HttpServletRequest request) {
log.info("ThreadName: " + Thread.currentThread().getName() + ", MethodName: asyncTask");
CompletableFuture<String> future = asyncService.asyncMethod();
System.out.println("这里可以处理其他");
System.out.println("这里可以处理其他");
System.out.println("这里可以处理其他");
String result = future.get(); // 获取异步方法的处理结果
return result;
}
}

Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
@Service
public class AsyncService {
@Async("taskExecutor")
public CompletableFuture<String> asyncMethod() {
// 同一个类中调用@Async方法,这个方法不会异步执行
simpleAsyncMethod();

String name = "MethodName: asyncMethod";
log.info("ThreadName: " + Thread.currentThread().getName() + ", " + name);
process(name);
String processResult = "这是异步方法最终的处理结果";
return CompletableFuture.completedFuture(processResult);
}

@Async
public void simpleAsyncMethod() {
String name = "MethodName: simpleAsyncMethod";
log.info("ThreadName: " + Thread.currentThread().getName() + ", " + name);
process(name);
}

private void process(String methodName) {
int i = 1;
while (i <= 5) {
try {
Thread.sleep(1000);
log.info(methodName + ", process " + i++);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

结果:在一个线程中调用 asyncMethod 的方法是异步的,但是 asyncMethod 方法调用 simpleAsyncMethod 则不是异步的,会在同一个线程中执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
2022-01-13 12:07:23.568  INFO 27968 --- [nio-8855-exec-1] c.z.service.controller.DemoController    : ThreadName: http-nio-8855-exec-1, MethodName: asyncTask
2022-01-13 12:07:23.574 INFO 27968 --- [ taskExecutor-1] com.zhaolq.service.service.AsyncService : ThreadName: taskExecutor-1, MethodName: simpleAsyncMethod
2022-01-13 12:07:24.585 INFO 27968 --- [ taskExecutor-1] com.zhaolq.service.service.AsyncService : MethodName: simpleAsyncMethod, process 1
2022-01-13 12:07:25.599 INFO 27968 --- [ taskExecutor-1] com.zhaolq.service.service.AsyncService : MethodName: simpleAsyncMethod, process 2
2022-01-13 12:07:26.613 INFO 27968 --- [ taskExecutor-1] com.zhaolq.service.service.AsyncService : MethodName: simpleAsyncMethod, process 3
2022-01-13 12:07:27.613 INFO 27968 --- [ taskExecutor-1] com.zhaolq.service.service.AsyncService : MethodName: simpleAsyncMethod, process 4
2022-01-13 12:07:28.628 INFO 27968 --- [ taskExecutor-1] com.zhaolq.service.service.AsyncService : MethodName: simpleAsyncMethod, process 5
2022-01-13 12:07:28.628 INFO 27968 --- [ taskExecutor-1] com.zhaolq.service.service.AsyncService : ThreadName: taskExecutor-1, MethodName: asyncMethod
2022-01-13 12:07:29.629 INFO 27968 --- [ taskExecutor-1] com.zhaolq.service.service.AsyncService : MethodName: asyncMethod, process 1
2022-01-13 12:07:30.641 INFO 27968 --- [ taskExecutor-1] com.zhaolq.service.service.AsyncService : MethodName: asyncMethod, process 2
2022-01-13 12:07:31.642 INFO 27968 --- [ taskExecutor-1] com.zhaolq.service.service.AsyncService : MethodName: asyncMethod, process 3
2022-01-13 12:07:32.642 INFO 27968 --- [ taskExecutor-1] com.zhaolq.service.service.AsyncService : MethodName: asyncMethod, process 4
2022-01-13 12:07:33.654 INFO 27968 --- [ taskExecutor-1] com.zhaolq.service.service.AsyncService : MethodName: asyncMethod, process 5

异常处理

对于 @Async 注解的方法抛出的异常,如果需要特殊处理,则需要实现 AsyncUncaughtExceptionHandler 接口,在接口中对异常进行处理,代码如下:

1
2
3
4
5
6
7
@Slf4j
public class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("caught async exception", ex);
}
}

然后实现 AsyncConfigurer 类,在类中返回该 ExceptionHandler

1
2
3
4
5
6
7
8
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncExceptionHandler();
}
}

多个TaskExecutor实例

当容器中有多个 TaskExecutor 实例时,@Async 注解会使用名称为 taskExecutor 的那个bean,修改配置如下并请求接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@EnableAsync
@Configuration
class TaskPoolConfig {
@Bean("aaa")
// @Primary
public TaskExecutor aaa() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize(); // 此方法必须调用,否则会抛出ThreadPoolTaskExecutor未初始化异常
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(60);
executor.setAllowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("aaa-");
return executor;
}

@Bean("bbb")
public TaskExecutor bbb() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize(); // 此方法必须调用,否则会抛出ThreadPoolTaskExecutor未初始化异常
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(60);
executor.setAllowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("bbb-");
return executor;
}
}

注意控制台输出:

1
2022-01-13 14:53:59.181  INFO 29456 --- [nio-8855-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : More than one TaskExecutor bean found within the context, and none is named 'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: [aaa, taskScheduler]

意思是:在上下文中发现了多个 TaskExecutor bean,并且没有一个被命名为“taskExecutor”。将其中一个标记为主要或将其命名为“taskExecutor”(可能作为别名),以便将其用于异步处理:[aaa,bbb, taskScheduler]

taskScheduler 这个是框架默认创建的,用来执行定时任务的线程池,也是 TaskExecutor 的实例。

自定义任务调度线程池

配置文件

1
2
3
4
5
6
7
spring:
task:
# ThreadPoolScheduleExecutor配置(@EnableScheduling注解开启)TaskExecutionProperties
scheduling:
pool:
size: 10 # 默认值1,允许的最大线程数
thread-name-prefix: taskScheduler- # 自定义TaskScheduling线程名称

实现SchedulingConfigurer接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@EnableScheduling
@Configuration
public class SchedulingConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
// 此方法必须调用,否则会抛出ThreadPoolTaskScheduler未初始化异常
scheduler.initialize();
scheduler.setPoolSize(10);
scheduler.setThreadNamePrefix("taskScheduler-");
scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskRegistrar.setScheduler(scheduler);
}
}

自定义TaskScheduler

可以使用自定义 bean 的方式,来自定义 @Scheduled 注解使用的线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@EnableScheduling
@Configuration
public class TaskPoolConfig {
@Bean("taskScheduler")
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
// 此方法必须调用,否则会抛出ThreadPoolTaskScheduler未初始化异常
scheduler.initialize();
scheduler.setPoolSize(10);
scheduler.setThreadNamePrefix("taskScheduler-");
scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return scheduler;
}
}

@Scheduled注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/**
* 使用此注解标记要调度的方法时,必须指定 cron、fixedDelay 或 fixedRate 属性之一。
* 带此注解的方法必须没有参数。它通常有一个 void 返回类型;如果不是,则在通过调度程序调用时将忽略返回的值。
*
* @Scheduled 注解的处理是通过注册一个 ScheduledAnnotationBeanPostProcessor 来执行的。这可以手动完成,或者更方便的是,通过 <task:annotation-driven> 元素或 @EnableScheduling 注释来完成。
* 此注释可用作元注释以创建具有属性覆盖的自定义组合注释。
*
* See Also: EnableScheduling, ScheduledAnnotationBeanPostProcessor, Schedules
*/
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class) // Java 8 开始支持,标识某注解可以在同一个声明上使用多次。
public @interface Scheduled {
/**
* 表示禁用触发器的特殊 cron 表达式值:“-”。这主要用于 {...} 占位符,允许外部禁用相应的计划方法。
*/
String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;

/**
* 类似 cron 的表达式,扩展了通常的 UNX 定义以包括秒、分钟、小时、月中的某天、月、周中的某天的触发器。
*
* 例如,"0 * * * * MON-FRI"表示工作日每分钟一次(在分钟的顶部 - 第 0 秒)。
* 从左到右读取的字段解释:秒、分钟、小时、月中的某天、月、周中的某天。
*
* 特殊值“-”表示禁用的 cron 触发器,主要用于由 {...} 占位符解析的外部指定值。
*
* 返回:可以解析为 cron 计划的表达式
* See Also: org.springframework.scheduling.support.CronExpression#parse(String)
*/
String cron() default "";

/**
* 将解析 cron 表达式的时区。默认情况下,此属性为空字符串(即,将使用服务器的本地时区)。
*
* 返回: java.util.TimeZone.getTimeZone(String) 接受的区域 id,或表示服务器默认时区的空字符串
* See Also: org.springframework.scheduling.support.CronTrigger.CronTrigger(String, java.util.TimeZone), java.util.TimeZone
*/
String zone() default "";

/**
* 在最后一次调用结束和下一次调用开始之间以毫秒为单位执行带注释的方法。
*
* 返回:以毫秒为单位的延迟
*/
long fixedDelay() default -1;

/**
* 在最后一次调用结束和下一次调用开始之间以毫秒为单位执行带注释的方法。
*
* 返回: 以毫秒为单位的延迟作为字符串值,例如占位符或符合 java.time.Duration 的值
*/
String fixedDelayString() default "";

/**
* 在调用之间以毫秒为单位执行带注释的方法。
*
* 返回:以毫秒为单位的周期
*/
long fixedRate() default -1;

/**
* 在调用之间以毫秒为单位执行带注释的方法。
*
* 返回:以毫秒为单位的字符串值的周期,例如占位符或符合 java.time.Duration 的值
*/
String fixedRateString() default "";

/**
* 在第一次执行 fixedRate 或 fixedDelay 任务之前要延迟的毫秒数。
*
* 返回:以毫秒为单位的初始延迟
*/
long initialDelay() default -1;

/**
* 在第一次执行 fixedRate 或 fixedDelay 任务之前要延迟的毫秒数。
*
* 返回:以毫秒为单位的初始延迟,作为字符串值,例如占位符或符合 java.time.Duration 的值
*/
String initialDelayString() default "";
}

示例service

1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
public class ScheduleService {
@Scheduled(cron = "0 0/5 * * * ?")
@Scheduled(cron = "0 * * * * MON-FRI")
public void schedule1() {
// 因为两次使用@Scheduled注解,所以工作日每5分钟点会有两次打印
log.info("每5分钟运行一次,工作日每1分钟运行一次");
}
}

多个TaskScheduler实例

当容器中有多个 TaskScheduler 实例时,@Async 注解会使用名称为 taskScheduler 的那个bean,修改配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@EnableScheduling
@Configuration
public class TaskPoolConfig {
@Bean("aaa")
// @Primary
public TaskScheduler aaa() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize(); // 此方法必须调用,否则会抛出ThreadPoolTaskScheduler未初始化异常
scheduler.setPoolSize(10);
scheduler.setThreadNamePrefix("aaa-");
scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return scheduler;
}

@Bean("bbb")
public TaskScheduler bbb() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize(); // 此方法必须调用,否则会抛出ThreadPoolTaskScheduler未初始化异常
scheduler.setPoolSize(10);
scheduler.setThreadNamePrefix("bbb-");
scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return scheduler;
}
}

注意控制台输出:

1
2022-01-13 15:12:07.100  INFO 14880 --- [           main] s.a.ScheduledAnnotationBeanPostProcessor : More than one TaskScheduler bean exists within the context, and none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' (possibly as an alias); or implement the SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: [aaa, bbb]

意思是:在上下文中发现了多个TaskScheduler bean,并且没有一个被命名为“taskScheduler”。将其中一个标记为主要或将其命名为“taskScheduler”(可能作为别名);或实现 SchedulingConfigurer 接口,并在 configureTasks() 回调中显式调用 ScheduledTaskRegistrar#setScheduler:【aaa,bbb】

共用一个线程池

可不可以让异步任务和任务调度使用一个线程池呢?我还没找到办法。。。