本文使用的CompletableFuture版本为java 8(java 11的CompletableFuture新增了一些方法)。
简介
Java 8 新增加了 CompletableFuture
类,该类提供了非常强大的 Future
扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture
的方法。
CompletableFuture 实现了 Future 和 CompletionStage 两个接口。
Future
它用来描述一个异步计算的结果。isDone()
方法可以用来检查计算是否完成,get()
方法可以用来获取结果,直到完成前一直阻塞当前线程,cancel()
方法可以取消任务。而对于结果的获取,只能通过阻塞 get()
或者轮询的方式 while(!isDone)
。阻塞的方式违背了异步编程的理念,轮询的方式耗费无谓的CPU资源(CPU空转)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
|
CompletionStage
一个CompletionStage对象是异步计算中的一个阶段,当一个CompletionStage完成时会触发下一个阶段。
CompletionStage解决了Future的一些问题:Future没有提供通知机制,对于结果的获取,只能通过阻塞 get()
或者轮询的方式 while(!isDone)
。CompletionStage完美的解决了该问题,前一个阶段执行成功后可以自动触发下一个阶段的执行(回调),中间无需等待。而且CompletableFuture还能够将任务放到不同的线程中执行,既可以在当前线程中直接执行任务,也可以将其放到其他任务线程中执行,这个过程是自动的,无需干预。
CompletionStage的方法虽多,但有命名规则,从名字上可以轻松知道这些方法是做什么的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
public interface CompletionStage<T> { }
|
阶段区分
1
| stage.thenApply(x -> square(x));
|
stage:此阶段。
thenApply返回值:下一个阶段。
thenXXX()
以then开头的方法,表示当任务正常完成后,便执行指定的回调逻辑。有的回调可以有返回值(CompletableFuture将返回值作为任务结果),有的回调没有返回值。
方法名 |
回调方法 |
回调方法参数 |
回调方法返回值 |
thenApply |
Function |
此阶段结果 |
阶段的泛型值 |
thenAccept |
Consumer |
此阶段结果 |
无 |
thenRun |
Runnable |
无 |
无 |
thenCombine |
BiFunction |
此阶段结果和另一个给定阶段的结果 |
阶段的泛型值 |
thenAcceptBoth |
BiConsumer |
此阶段结果和另一个给定阶段的结果 |
无 |
thenCompose |
Function |
此阶段结果 |
阶段 |
注意:thenCompose()
方法的回调方法的返回值是一个 阶段,而非阶段的泛型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);
public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor);
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);
|
runXXX()
回调方法是Runnable,无参数,无返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);
public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);
|
XXXEither()
applyToEither()
方法有三,表示此阶段或者other阶段,任意一个正常完成后,执行fn逻辑。
回调方法是Function,参数为此阶段结果或另一个给定阶段的结果,取决于哪个先完成,有返回值。
1 2 3 4 5 6 7 8 9 10 11
|
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor);
|
acceptEither()
和 applyToEither 方法相似。
方法有三,表示此阶段或者other阶段,任意一个正常完成后,执行action逻辑。
回调方法是Consumer,参数为此阶段结果或另一个给定阶段的结果,取决于哪个先完成,无返回值。
1 2 3 4 5 6 7 8 9 10
|
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);
|
exceptionally()
如果此阶段执行过程中抛出了异常,那么将异常对象传输给 fn,fn 对异常对象做处理。如果任务正常结束了,直接将正常完成的结果作为返回的 CompletionStage 对象的结果(泛型)。
回调方法是Function,参数为此阶段抛出的异常,有返回值。
1 2 3 4 5 6 7 8 9
|
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
|
whenComplete()
方法有三,表示无论此阶段正常完成还是异常完成,都会执行action回调。
回调方法是BiConsumer,参数为 此阶段的结果 和 此阶段抛出的异常,这两个参数都可以为null。该函数式接口的方法无返回值,但是whenComplete方法有返回值。
如果提供的 action 正常完成:whenComplete方法返回的新阶段将保留此阶段的结果或异常(异常和结果不会同时存在,其中一个是 null
)。
调用CompletableFuture的get()时,正常完成时就获取到结果,出现异常时就会抛出异常。
如果提供的 action 遇到异常:返回新阶段 异常完成 并携带这个异常。
1 2 3 4 5 6 7 8 9 10
|
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);
|
应用场景:发生异常后回滚等操作。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class Test { public static void main(String[] args) { try { System.out.println(Thread.currentThread().getName()); whenComplete(); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }
public static void whenComplete() throws ExecutionException, InterruptedException { Executor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5, true));
CompletableFuture<Long> future = CompletableFuture .supplyAsync(() -> { System.out.println("step1 threadName: " + Thread.currentThread().getName()); int i = 10 / 0; return System.currentTimeMillis(); }, executor) .whenCompleteAsync( new BiConsumer<Long, Throwable>() { @Override public void accept(Long aLong, Throwable throwable) { System.out.println("step2 threadName(whenComplete): " + Thread.currentThread().getName()); System.out.println("正常完成: " + aLong); System.out.println("异常完成: " + throwable.getMessage()); } }, executor) .exceptionally(new Function<Throwable, Long>() { @Override public Long apply(Throwable throwable) { System.out.println("step3 threadName(exceptionally): " + Thread.currentThread().getName()); System.out.println("执行失败!" + throwable.getMessage()); return null; } });
System.out.println("future最终结果: " + future.get()); } }
|
handle()
方法有三,表示无论此阶段正常完成还是异常完成,都会执行fn回调。
回调是BiFunction,参数为 此阶段的结果 和 此阶段抛出的异常,这两个参数都可以为null,该函数式接口的方法有返回值。
1 2 3 4 5 6 7 8 9 10 11 12
|
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);
|
与 whenComplete 方法区别:
- whenComplete 方法的回调没有返回值,handle 方法的回调有返回值。
- whenComplete 方法返回的新阶段将保留此阶段的结果或异常,而 handle 方法返回的新阶段的泛型值是回调方法 fn 的返回值。也就是说 handle 方法回调函数的返回值会影响最终的计算结果。
应用场景:
- 若此阶段正常完成,加工结果。
- 若此阶段异常完成,返回一个默认值并回滚。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class Test { public static void main(String[] args) { try { System.out.println(Thread.currentThread().getName()); whenComplete(); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); }
public static void whenComplete() throws ExecutionException, InterruptedException { Executor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5, true));
CompletableFuture<Long> future = CompletableFuture .supplyAsync(() -> { System.out.println("step1 threadName: " + Thread.currentThread().getName()); int i = 10 / 0; return System.currentTimeMillis(); }, executor) .handleAsync(new BiFunction<Long, Throwable, Long>() { @Override public Long apply(Long aLong, Throwable throwable) { System.out.println("step2 threadName(handleAsync): " + Thread.currentThread().getName()); if (throwable == null) { return aLong; } else { System.out.println(throwable.getMessage()); return -1L; } } }, executor);
System.out.println("future最终结果: " + future.get()); } }
|
toCompletableFuture()
1 2 3 4 5 6 7 8 9 10
|
public CompletableFuture<T> toCompletableFuture();
|
CompletableFuture 类是这样实现的
1 2 3
| public CompletableFuture<T> toCompletableFuture() { return this; }
|
CompletableFuture
简介
CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调,支持流式(Stream)的计算处理,使 Java 在处理多任务的协同工作时更加顺畅便利。
比如:结果的合并;对结果异步的处理;阶段1产生的结果可以直接作为阶段2的入参,参与阶段2的计算,以此类推。这些功能放到 Futrue 身上也能实现,但现成 API 更简洁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}
|
如果任务很耗时,记得传 Executor,或者方法末尾加上 future.get()
,因为 CompletableFuture 默认使用 ForkJoinPool,而 ForkJoinPool 里面的线程都是daemon线程,主线程跑完就 over 了。
静态方法
创建异步操作的四个静态方法:
1 2 3 4
| public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
|
前两个回调方法是 Runnable,无返回值。
后两个回调方法是 Supplier,有返回值。
入参有一个线程池 Executor,如果不指定就使用默认的线程池 {@link ForkJoinPool#commonPool()}
。如果机器是单核的,则默认使用 ThreadPerTaskExecutor
,该类是一个内部类,每次执行execute都会创建一个新线程。详情请查看 CompletableFuture 源码。
创建一个没有任何操作的 CompletableFuture 对象:
1 2 3 4 5 6 7 8
|
public static <U> CompletableFuture<U> completedFuture(U value)
|
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
public static void runAsync() throws ExecutionException, InterruptedException { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("run end ..."); }); Void voidResult = future.get(); System.out.println("voidResult = " + voidResult); }
public static void supplyAsync() throws ExecutionException, InterruptedException { CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("run end ..."); return System.currentTimeMillis(); }); long completeTime = future.get(); System.out.println("completeTime = " + completeTime); }
|
主动完成
complete()
1 2 3 4 5 6 7
|
public boolean complete(T value)
|
为什么叫CompletableFuture?CompletableFuture字面翻译过来,就是“可完成的Future”。同传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回。
completeExceptionally()
1 2 3 4 5 6 7
|
public boolean completeExceptionally(Throwable ex)
|
isCompletedExceptionally()
1 2 3 4 5 6
|
public boolean isCompletedExceptionally()
|
示例
complete()
或 completeExceptionally()
只能调用一次,后续调用将被忽略。若想覆盖 future 之前的值请小心使用 CompletableFuture.obtrudeValue()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class Test { public static void main(String[] args) { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return 1; });
future.complete(100); future.completeExceptionally(new Exception("手动异常完成"));
System.out.println(future.isCompletedExceptionally()); System.out.println(future.join()); } }
|
主动完成-强加值
obtrudeValue()
1 2 3 4 5 6 7
|
public void obtrudeValue(T value)
|
obtrudeException()
1 2 3 4 5 6 7 8
|
public void obtrudeException(Throwable ex)
|
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class Test { public static void main(String[] args) { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return 1; });
future.obtrudeValue(100); future.obtrudeValue(200);
System.out.println(future.isCompletedExceptionally()); System.out.println(future.join()); } }
|
getNow()
1 2 3 4 5 6 7 8 9
|
public T getNow(T valueIfAbsent)
|
join()
最简单的方式就是通过构造函数创建一个CompletableFuture实例。如下代码所示。由于新创建的CompletableFuture还没有任何计算结果,这时调用join,当前线程会一直阻塞在这里。
1 2 3
| CompletableFuture<String> future = new CompletableFuture<>(); String result = future.join(); System.out.println(result);
|
此时,如果在另外一个线程中,主动设置该CompletableFuture的值,则上面线程中的结果就能返回。
1
| future.complete("test");
|
与 Future.get()
的区别:
1
| V get() throws InterruptedException, ExecutionException;
|
get()
方法上声明了异常,所以必须捕获。
join()
方法上未声明异常,不用捕获,但可能抛出异常。
两者抛出的异常不同。
allOf()-与
1 2 3 4 5 6 7 8 9 10 11 12 13
|
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
|
allOf()
的返回值是 CompletableFuture<Void>
类型,这是因为每个传入的 CompletableFuture 的返回值都可能不同,所以组合的结果无法用某种类型来表示,索性返回 Void
类型。那么,如何获取每个 CompletableFuture 的执行结果呢?
因为 allof()
没有返回值,所以通过 theApply()
,给 allFutures 附上一个回调函数,在回调函数里面,调用每一个 Future 的 get()/join()
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class Test { public static void main(String[] args) { Executor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5, true));
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return 1; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return "string"; }, executor); CompletableFuture<Boolean> future3 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return true; }, executor); CompletableFuture.allOf(future1, future2, future3).thenApply((res) -> { System.out.println(Thread.currentThread().getName()); System.out.println("future1结果: " + future1.join()); System.out.println("future2结果: " + future2.join()); System.out.println("future3结果: " + future3.join()); return "结束"; });
System.exit(0); } }
|
anyOf()-或
1 2 3 4 5 6 7 8
|
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
|
任意一个 CompletableFuture 结束,就返回一个和它结果或异常相同的新的 CompletableFuture。
由于每个 CompletableFuture 的返回值类型都可能不同,无法判断最终返回类型,所以 anyOf 的返回值是 CompletableFuture<Object>
类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class Test { public static void main(String[] args) { Executor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5, true));
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return 1; }, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return "string"; }, executor);
CompletableFuture<Boolean> future3 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return true; }, executor);
CompletableFuture<Object> resultFuture = CompletableFuture.anyOf(future1, future2, future3); resultFuture.thenApply((res) -> { System.out.println(Thread.currentThread().getName()); System.out.println(res.getClass() + " 结果: " + res); return "结束"; });
System.exit(0); } }
|
getNumberOfDependents()
1 2 3 4 5 6
|
public int getNumberOfDependents()
|