简介 Spring中提供了 @Async
注解用于执行异步任务 ,而 @Schedule
注解用于执行定时任务 ,两个都有默认的线程池配置,也可以通过配置文件或者自定Bean的方式来更改默认配置。
想要使用 @Async
和 @Scheduled
注解,需要在配置类、项目启动类、controller类、service类或其他由spring容器管理的bean类 上添加注解: @EnableAsync
和 @EnableScheduling
。
自定义异步任务线程池 配置文件 在 SpringBoot 中,可以在配置文件中指定线程池的配置,具体配置如下:
线程池 bean 的默认名称为 taskExecutor
。
1 2 3 4 5 6 7 8 9 10 11 spring: task: execution: pool: core-size: 10 max-size: 20 queue-capacity: 1000 keep-alive: 60s allow-core-thread-timeout: true thread-name-prefix: taskExecutor-
另外一种方式是实现 AsyncConfigurer
接口,在方法 getAsyncExecutor
中,返回自定义的线程池:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @EnableAsync @Configuration public class AsyncConfig implements AsyncConfigurer { @Override public TaskExecutor getAsyncExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); executor.initialize(); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(20 ); executor.setQueueCapacity(1000 ); executor.setKeepAliveSeconds(60 ); executor.setAllowCoreThreadTimeOut(true ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor .CallerRunsPolicy()); executor.setThreadNamePrefix("taskExecutor-" ); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler () { return new MyAsyncExceptionHandler (); } }
自定义TaskExecutor 可以使用自定义 bean 的方式,来自定义 @Async 注解使用的线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @EnableAsync @Configuration class TaskPoolConfig { @Bean("taskExecutor") public TaskExecutor taskExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); executor.initialize(); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(20 ); executor.setQueueCapacity(1000 ); executor.setKeepAliveSeconds(60 ); executor.setAllowCoreThreadTimeOut(true ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor .CallerRunsPolicy()); executor.setThreadNamePrefix("taskExecutor-" ); return executor; } }
@Async注解 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Async { String value () default "" ; }
注意:
@Async 注解默认使用 bean 名称为 taskExecutor
的线程池,允许指定自定义的线程池。
在同一个类中 ,如果一个方法(包括 @Async 方法)调用另外一个 @Async 方法,第二个方法不会异步执行,这就是代理绕过问题 。
@Async 注解所在类必须由 spring 容器管理,调用 @Async 方法的实例必须是 spring 容器中的实例,否则不会异步。手动 new 的对象调用不会异步。
示例 Controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Slf4j @RestController public class DemoController { @Autowired private AsyncService asyncService; @RequestMapping(value = "/asyncTask", method = RequestMethod.GET) public String asyncTask (HttpServletRequest request) { log.info("ThreadName: " + Thread.currentThread().getName() + ", MethodName: asyncTask" ); CompletableFuture<String> future = asyncService.asyncMethod(); System.out.println("这里可以处理其他" ); System.out.println("这里可以处理其他" ); System.out.println("这里可以处理其他" ); String result = future.get(); return result; } }
Service
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Slf4j @Service public class AsyncService { @Async("taskExecutor") public CompletableFuture<String> asyncMethod () { simpleAsyncMethod(); String name = "MethodName: asyncMethod" ; log.info("ThreadName: " + Thread.currentThread().getName() + ", " + name); process(name); String processResult = "这是异步方法最终的处理结果" ; return CompletableFuture.completedFuture(processResult); } @Async public void simpleAsyncMethod () { String name = "MethodName: simpleAsyncMethod" ; log.info("ThreadName: " + Thread.currentThread().getName() + ", " + name); process(name); } private void process (String methodName) { int i = 1 ; while (i <= 5 ) { try { Thread.sleep(1000 ); log.info(methodName + ", process " + i++); } catch (InterruptedException e) { e.printStackTrace(); } } } }
结果:在一个线程中调用 asyncMethod 的方法是异步的,但是 asyncMethod 方法调用 simpleAsyncMethod 则不是异步的,会在同一个线程中执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 2022 -01 -13 12 :07 :23.568 INFO 27968 --- [nio-8855 -exec-1 ] c.z.service.controller.DemoController : ThreadName: http-nio-8855 -exec-1 , MethodName: asyncTask2022 -01 -13 12 :07 :23.574 INFO 27968 --- [ taskExecutor-1 ] com.zhaolq.service.service.AsyncService : ThreadName: taskExecutor-1 , MethodName: simpleAsyncMethod2022 -01 -13 12 :07 :24.585 INFO 27968 --- [ taskExecutor-1 ] com.zhaolq.service.service.AsyncService : MethodName: simpleAsyncMethod, process 1 2022 -01 -13 12 :07 :25.599 INFO 27968 --- [ taskExecutor-1 ] com.zhaolq.service.service.AsyncService : MethodName: simpleAsyncMethod, process 2 2022 -01 -13 12 :07 :26.613 INFO 27968 --- [ taskExecutor-1 ] com.zhaolq.service.service.AsyncService : MethodName: simpleAsyncMethod, process 3 2022 -01 -13 12 :07 :27.613 INFO 27968 --- [ taskExecutor-1 ] com.zhaolq.service.service.AsyncService : MethodName: simpleAsyncMethod, process 4 2022 -01 -13 12 :07 :28.628 INFO 27968 --- [ taskExecutor-1 ] com.zhaolq.service.service.AsyncService : MethodName: simpleAsyncMethod, process 5 2022 -01 -13 12 :07 :28.628 INFO 27968 --- [ taskExecutor-1 ] com.zhaolq.service.service.AsyncService : ThreadName: taskExecutor-1 , MethodName: asyncMethod2022 -01 -13 12 :07 :29.629 INFO 27968 --- [ taskExecutor-1 ] com.zhaolq.service.service.AsyncService : MethodName: asyncMethod, process 1 2022 -01 -13 12 :07 :30.641 INFO 27968 --- [ taskExecutor-1 ] com.zhaolq.service.service.AsyncService : MethodName: asyncMethod, process 2 2022 -01 -13 12 :07 :31.642 INFO 27968 --- [ taskExecutor-1 ] com.zhaolq.service.service.AsyncService : MethodName: asyncMethod, process 3 2022 -01 -13 12 :07 :32.642 INFO 27968 --- [ taskExecutor-1 ] com.zhaolq.service.service.AsyncService : MethodName: asyncMethod, process 4 2022 -01 -13 12 :07 :33.654 INFO 27968 --- [ taskExecutor-1 ] com.zhaolq.service.service.AsyncService : MethodName: asyncMethod, process 5
异常处理 对于 @Async
注解的方法抛出的异常,如果需要特殊处理,则需要实现 AsyncUncaughtExceptionHandler
接口,在接口中对异常进行处理,代码如下:
1 2 3 4 5 6 7 @Slf4j public class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException (Throwable ex, Method method, Object... params) { log.error("caught async exception" , ex); } }
然后实现 AsyncConfigurer
类,在类中返回该 ExceptionHandler
:
1 2 3 4 5 6 7 8 @EnableAsync @Configuration public class AsyncConfig implements AsyncConfigurer { @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler () { return new MyAsyncExceptionHandler (); } }
多个TaskExecutor实例 当容器中有多个 TaskExecutor 实例时,@Async 注解会使用名称为 taskExecutor
的那个bean,修改配置如下并请求接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @EnableAsync @Configuration class TaskPoolConfig { @Bean("aaa") public TaskExecutor aaa () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); executor.initialize(); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(20 ); executor.setQueueCapacity(1000 ); executor.setKeepAliveSeconds(60 ); executor.setAllowCoreThreadTimeOut(true ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor .CallerRunsPolicy()); executor.setThreadNamePrefix("aaa-" ); return executor; } @Bean("bbb") public TaskExecutor bbb () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); executor.initialize(); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(20 ); executor.setQueueCapacity(1000 ); executor.setKeepAliveSeconds(60 ); executor.setAllowCoreThreadTimeOut(true ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor .CallerRunsPolicy()); executor.setThreadNamePrefix("bbb-" ); return executor; } }
注意控制台输出:
1 2022 -01 -13 14 :53 :59.181 INFO 29456 --- [nio-8855 -exec-1 ] .s.a.AnnotationAsyncExecutionInterceptor : More than one TaskExecutor bean found within the context, and none is named 'taskExecutor' . Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: [aaa, taskScheduler]
意思是:在上下文中发现了多个 TaskExecutor bean,并且没有一个被命名为“taskExecutor”。将其中一个标记为主要或将其命名为“taskExecutor”(可能作为别名),以便将其用于异步处理:[aaa,bbb, taskScheduler]
taskScheduler
这个是框架默认创建的,用来执行定时任务的线程池,也是 TaskExecutor 的实例。
自定义任务调度线程池 配置文件 1 2 3 4 5 6 7 spring: task: scheduling: pool: size: 10 thread-name-prefix: taskScheduler-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @EnableScheduling @Configuration public class SchedulingConfig implements SchedulingConfigurer { @Override public void configureTasks (ScheduledTaskRegistrar taskRegistrar) { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler (); scheduler.initialize(); scheduler.setPoolSize(10 ); scheduler.setThreadNamePrefix("taskScheduler-" ); scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor .CallerRunsPolicy()); taskRegistrar.setScheduler(scheduler); } }
自定义TaskScheduler 可以使用自定义 bean 的方式,来自定义 @Scheduled 注解使用的线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @EnableScheduling @Configuration public class TaskPoolConfig { @Bean("taskScheduler") public TaskScheduler taskScheduler () { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler (); scheduler.initialize(); scheduler.setPoolSize(10 ); scheduler.setThreadNamePrefix("taskScheduler-" ); scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor .CallerRunsPolicy()); return scheduler; } }
@Scheduled注解 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Repeatable(Schedules.class) public @interface Scheduled { String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED; String cron () default "" ; String zone () default "" ; long fixedDelay () default -1 ; String fixedDelayString () default "" ; long fixedRate () default -1 ; String fixedRateString () default "" ; long initialDelay () default -1 ; String initialDelayString () default "" ; }
示例service
1 2 3 4 5 6 7 8 9 10 @Slf4j @Component public class ScheduleService { @Scheduled(cron = "0 0/5 * * * ?") @Scheduled(cron = "0 * * * * MON-FRI") public void schedule1 () { log.info("每5分钟运行一次,工作日每1分钟运行一次" ); } }
多个TaskScheduler实例 当容器中有多个 TaskScheduler 实例时,@Async 注解会使用名称为 taskScheduler
的那个bean,修改配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @EnableScheduling @Configuration public class TaskPoolConfig { @Bean("aaa") public TaskScheduler aaa () { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler (); scheduler.initialize(); scheduler.setPoolSize(10 ); scheduler.setThreadNamePrefix("aaa-" ); scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor .CallerRunsPolicy()); return scheduler; } @Bean("bbb") public TaskScheduler bbb () { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler (); scheduler.initialize(); scheduler.setPoolSize(10 ); scheduler.setThreadNamePrefix("bbb-" ); scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor .CallerRunsPolicy()); return scheduler; } }
注意控制台输出:
1 2022-01-13 15:12:07.100 INFO 14880 --- [ main] s.a.ScheduledAnnotationBeanPostProcessor : More than one TaskScheduler bean exists within the context, and none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' (possibly as an alias); or implement the SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: [aaa, bbb]
意思是:在上下文中发现了多个TaskScheduler bean,并且没有一个被命名为“taskScheduler”。将其中一个标记为主要或将其命名为“taskScheduler”(可能作为别名);或实现 SchedulingConfigurer 接口,并在 configureTasks() 回调中显式调用 ScheduledTaskRegistrar#setScheduler:【aaa,bbb】
共用一个线程池 可不可以让异步任务和任务调度使用一个线程池呢?我还没找到办法。。。