Java8 CompletableFuture

本文使用的CompletableFuture版本为java 8(java 11的CompletableFuture新增了一些方法)。

简介

Java 8 新增加了 CompletableFuture 类,该类提供了非常强大的 Future 扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。

CompletableFuture 实现了 Future 和 CompletionStage 两个接口。

Future

它用来描述一个异步计算的结果。isDone() 方法可以用来检查计算是否完成,get() 方法可以用来获取结果,直到完成前一直阻塞当前线程,cancel() 方法可以取消任务。而对于结果的获取,只能通过阻塞 get() 或者轮询的方式 while(!isDone) 。阻塞的方式违背了异步编程的理念,轮询的方式耗费无谓的CPU资源(CPU空转)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Future 表示异步计算的结果。
*/
public interface Future<V> {
/**
* 尝试取消此任务的执行。
* 取消失败:如果任务已完成、已被取消或由于其他原因无法取消,则此尝试将失败。
* 取消成功:如果取消成功,并且在调用取消时此任务尚未启动,则不应运行此任务。如果任务已经开始,则 mayInterruptIfRunning 参数确定是否应该中断执行此任务的线程以尝试停止任务。
*
* 此方法返回后,对 isDone 的后续调用将始终返回 true。
* 如果此方法返回 true,则对 isCancelled 的后续调用将始终返回 true。
*
* 参数:mayInterruptIfRunning——如果执行此任务的线程应该被中断,则为true;否则,允许完成正在进行的任务。
* 返回:如果任务无法取消,则返回 false,通常是因为它已经正常完成;否则为true。
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* 如果此任务在完成之前被取消,则返回 true。
*/
boolean isCancelled();

/**
* 如果此任务完成,则返回 true。完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回 true。
*/
boolean isDone();

/**
* 如有必要,等待计算完成,然后检索其结果。
* 返回:计算结果。
* 抛出:CancellationException – 如果计算被取消
* ExecutionException – 如果计算抛出异常
* InterruptedException – 如果当前线程在等待时被中断
*/
V get() throws InterruptedException, ExecutionException;

/**
* 如有必要,最多等待给定的计算完成时间,然后检索其结果(如果可用)。
* 参数:timeout - 最长等待时间
* unit - 超时参数的时间单位
* 返回:计算结果。
* 抛出:CancellationException – 如果计算被取消
* ExecutionException – 如果计算抛出异常
* InterruptedException – 如果当前线程在等待时被中断
* TimeoutException – 如果等待超时
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

CompletionStage

一个CompletionStage对象是异步计算中的一个阶段,当一个CompletionStage完成时会触发下一个阶段。

CompletionStage解决了Future的一些问题:Future没有提供通知机制,对于结果的获取,只能通过阻塞 get() 或者轮询的方式 while(!isDone) 。CompletionStage完美的解决了该问题,前一个阶段执行成功后可以自动触发下一个阶段的执行(回调),中间无需等待。而且CompletableFuture还能够将任务放到不同的线程中执行,既可以在当前线程中直接执行任务,也可以将其放到其他任务线程中执行,这个过程是自动的,无需干预。

CompletionStage的方法虽多,但有命名规则,从名字上可以轻松知道这些方法是做什么的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 一个可能的异步计算 stage(阶段),在另一个 CompletionStage 完成时执行操作或计算值。一个 stage 在其计算终止时完成,但这可能反过来触发其他相关 stage。
*
* 此接口中定义的功能仅采用几种基本形式,这些形式扩展为更大的方法集,以捕获一系列使用风格:
* 1、阶段执行的计算可以表示为 Function、Consumer 或 Runnable(使用名称分别包括 apply、accept 或 run 的方法),具体取决于它是否需要参数或产生结果。
* 例如 stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println()).
* 2、一个阶段的执行可以由单个阶段的完成、或两个阶段的完成、或两个阶段中的任何一个的完成触发。
* !!!由两个阶段中的任何一个触发的那些不能保证哪些结果或效果用于依赖阶段的计算!!!
* 3、阶段之间的依赖关系控制计算的触发,但不保证任何特定的顺序。此外,新阶段计算的执行可以通过以下三种方式中的任何一种进行安排:
* a) 默认执行
* b) 默认异步执行(使用带有后缀 async 的方法,该方法采用阶段的默认异步执行工具)
* c) 自定义(通过提供的 Executor)。
* a 和 b 的执行属性由 CompletionStage 实现指定,而不是此接口。
* 具有显式 Executor 参数的方法可能具有任意执行属性,甚至可能不支持并发执行,但会以适应异步的方式进行处理。
* 4、两种方法形式用来处理 触发阶段是正常完成还是异常完成:
* whenComplete 方法:无论此阶段结果如何,都允许注入一个action。
* 如果action正常完成,whenComplete方法返回的新阶段将保留此阶段的结果或异常(异常和结果不会同时存在,其中一个是null)。
* 如果action遇到异常,返回新阶段 异常完成 并携带这个异常。
* handle 方法:允许新阶段处理此阶段的结果并返回,该结果同样允许其他相关阶段进行进一步处理。
* 在所有其他情况下,如果一个阶段的计算因(未经检查的)异常或错误而突然终止,则所有需要其完成的相关阶段也会异常完成,并且将 CompletionException 异常作为其原因。
* 如果一个阶段依赖于两个阶段,并且都异常完成,那么 CompletionException 可能对应于这些异常中的任何一个。
* 如果一个阶段依赖于其他两个阶段中的任何一个,并且其中只有一个异常完成,则无法保证依赖阶段是正常完成还是异常完成。
*
* 所有方法都遵循上述触发、执行和异常完成规范(在单个方法规范中不再重复)。此外,用于接收阶段完成结果的参数(即 T 类型的参数)可以为 null,但任何其他参数传递 null 值将引发 NullPointerException。
*
* 该接口未定义初始创建、正常或异常强制完成、探测完成状态或结果或等待阶段完成的方法。CompletionStage 的实现可以酌情提供实现这种效果的方法。
* 方法 toCompletableFuture 通过提供一个通用的转换类型来实现此接口的不同实现之间的互操作性。
*/
public interface CompletionStage<T> {
}

阶段区分

1
stage.thenApply(x -> square(x));

stage:此阶段。

thenApply返回值:下一个阶段。

thenXXX()

以then开头的方法,表示当任务正常完成后,便执行指定的回调逻辑。有的回调可以有返回值(CompletableFuture将返回值作为任务结果),有的回调没有返回值。

方法名 回调方法 回调方法参数 回调方法返回值
thenApply Function 此阶段结果 阶段的泛型值
thenAccept Consumer 此阶段结果
thenRun Runnable
thenCombine BiFunction 此阶段结果和另一个给定阶段的结果 阶段的泛型值
thenAcceptBoth BiConsumer 此阶段结果和另一个给定阶段的结果
thenCompose Function 此阶段结果 阶段

注意:thenCompose() 方法的回调方法的返回值是一个 阶段,而非阶段的泛型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* 返回一个新的 CompletionStage,当此阶段正常完成时,将使用此阶段的结果作为所提供函数的参数来执行该阶段。
*
* 参数:fn – 用于计算返回的 CompletionStage 值的函数
* 类型参数:<U> – 函数的返回类型
* 返回:新的 CompletionStage
*/
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); // 使用此阶段的默认异步执行工具执行
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor); // 使用提供的执行器 Executor 执行

/**
* 返回一个新的 CompletionStage,当此阶段正常完成时,将使用此阶段的结果作为所提供操作的参数来执行该阶段。
*
* 参数:action - 在完成返回的 CompletionStage 之前要执行的操作
* 返回:新的 CompletionStage
*/
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); // 使用此阶段的默认异步执行工具执行
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor); // 使用提供的执行器 Executor 执行

/**
* 返回一个新的 CompletionStage,当此阶段正常完成时,执行给定操作。不接受其他任务结果参数
*
* 参数:action - 在完成返回的 CompletionStage 之前要执行的操作
* 返回:新的 CompletionStage
*/
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action); // 使用此阶段的默认异步执行工具执行
public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor); // 使用提供的执行器 Executor 执行

/**
* 返回一个新的 CompletionStage,当此阶段和另一个给定阶段都正常完成时,将使用两个结果作为所提供函数的参数执行。
*
* 参数:other – 另一个 CompletionStage
* fn – 用于计算返回的 CompletionStage 值的函数
* 类型参数:<U> – 其他 CompletionStage 的结果类型
* <V> – 函数的返回类型
* 返回:新的 CompletionStage
*/
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn); // 使用此阶段的默认异步执行工具执行
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor); // 使用提供的执行器 Executor 执行

/**
* 返回一个新的 CompletionStage,当此阶段和另一个给定阶段都正常完成时,将使用两个结果作为所提供操作的参数执行。
*
* 参数:other – 另一个 CompletionStage
* action – 在完成返回的 CompletionStage 之前要执行的操作
* 类型参数:<U> – 其他 CompletionStage 的结果类型
* 返回:新的 CompletionStage
*/
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action); // 使用此阶段的默认异步执行工具执行
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor); // 使用提供的执行器 Executor 执行

/**
* 返回一个新的 CompletionStage,当此阶段正常完成时,将使用此阶段作为所提供函数的参数执行。
*
* 参数:fn – 返回新的 CompletionStage 的函数
* 类型参数:<U> – 返回的 CompletionStage 结果的类型
* 返回:CompletionStage
*/
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn); // 使用此阶段的默认异步执行工具执行
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor); // 使用提供的执行器 Executor 执行

runXXX()

回调方法是Runnable,无参数,无返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 返回一个新的 CompletionStage,当此阶段和另一个给定阶段都正常完成时,执行给定操作。
*
* 参数:other – 另一个 CompletionStage
* action – 在完成返回的 CompletionStage 之前要执行的操作
* 返回:新的 CompletionStage
*/
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action); // 使用此阶段的默认异步执行工具执行
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor); // 使用提供的执行器 Executor 执行

/**
* 返回一个新的 CompletionStage,当此阶段或另一个给定阶段正常完成时,执行给定操作。
*
* 参数:other – 另一个 CompletionStage
* action – 在完成返回的 CompletionStage 之前要执行的操作
* 返回:新的 CompletionStage
*/
public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action); // 使用此阶段的默认异步执行工具执行
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor); // 使用提供的执行器 Executor 执行

XXXEither()

applyToEither()

方法有三,表示此阶段或者other阶段,任意一个正常完成后,执行fn逻辑。

回调方法是Function,参数为此阶段结果或另一个给定阶段的结果,取决于哪个先完成返回值。

1
2
3
4
5
6
7
8
9
10
11
/**
* 返回一个新的 CompletionStage,当此阶段或另一个给定阶段正常完成时,将使用相应的结果作为所提供函数的参数执行。
*
* 参数:other – 另一个 CompletionStage
* fn – 用于计算返回的 CompletionStage 值的函数
* 类型参数:<U> – 函数的返回类型
* 返回:新的 CompletionStage
*/
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn); // 使用此阶段的默认异步执行工具执行
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor); // 使用提供的执行器 Executor 执行

acceptEither()

和 applyToEither 方法相似。

方法有三,表示此阶段或者other阶段,任意一个正常完成后,执行action逻辑。

回调方法是Consumer,参数为此阶段结果或另一个给定阶段的结果,取决于哪个先完成返回值。

1
2
3
4
5
6
7
8
9
10
/**
* 返回一个新的 CompletionStage,当此阶段另一个给定阶段正常完成时,将使用相应的结果作为所提供操作的参数执行。
*
* 参数:other – 另一个 CompletionStage
* action – 在完成返回的 CompletionStage 之前要执行的操作
* 返回:新的 CompletionStage
*/
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action); // 使用此阶段的默认异步执行工具执行
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor); // 使用提供的执行器 Executor 执行

exceptionally()

如果此阶段执行过程中抛出了异常,那么将异常对象传输给 fn,fn 对异常对象做处理。如果任务正常结束了,直接将正常完成的结果作为返回的 CompletionStage 对象的结果(泛型)。

回调方法是Function,参数为此阶段抛出的异常,返回值。

1
2
3
4
5
6
7
8
9
/**
* 返回一个新的 CompletionStage
* 如果此阶段异常完成,将使用此阶段的异常作为所提供函数的参数来执行该阶段。
* 如果此阶段正常完成,则 exceptionally 方法返回的阶段也会以相同的值(此阶段的正常结果)正常完成。
*
* 参数:fn – 如果此 CompletionStage 异常完成,则用于计算返回的 CompletionStage 值的函数
* 返回:新的 CompletionStage
*/
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

whenComplete()

方法有三,表示无论此阶段正常完成还是异常完成,都会执行action回调。

回调方法是BiConsumer,参数为 此阶段的结果此阶段抛出的异常,这两个参数都可以为null。该函数式接口的方法返回值,但是whenComplete方法有返回值。

如果提供的 action 正常完成:whenComplete方法返回的新阶段将保留此阶段的结果或异常(异常和结果不会同时存在,其中一个是 null )。

调用CompletableFuture的get()时,正常完成时就获取到结果,出现异常时就会抛出异常。

如果提供的 action 遇到异常:返回新阶段 异常完成 并携带这个异常。

1
2
3
4
5
6
7
8
9
10
/**
* 返回一个与此阶段具有相同结果或异常的新 CompletionStage,它在此阶段完成时执行给定的操作。
* 当此阶段完成时,使用此阶段的结果(如果没有,则为 null)和异常(如果没有,则为 null)作为参数调用给定的操作,此阶段的结果和异常也是返回的新阶段的结果和异常。
*
* 参数:action——要执行的动作
* 返回:新的 CompletionStage
*/
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); // 使用此阶段的默认异步执行工具执行
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); // 使用提供的执行器 Executor 执行

应用场景:发生异常后回滚等操作。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Test {
public static void main(String[] args) {
try {
System.out.println(Thread.currentThread().getName());
whenComplete();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);
}

public static void whenComplete() throws ExecutionException, InterruptedException {
Executor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5, true));

CompletableFuture<Long> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("step1 threadName: " + Thread.currentThread().getName());
int i = 10 / 0;
return System.currentTimeMillis();
}, executor)
.whenCompleteAsync(
new BiConsumer<Long, Throwable>() {
@Override
public void accept(Long aLong, Throwable throwable) {
System.out.println("step2 threadName(whenComplete): " + Thread.currentThread().getName());
System.out.println("正常完成: " + aLong);
System.out.println("异常完成: " + throwable.getMessage());
}
}, executor)
.exceptionally(new Function<Throwable, Long>() {
@Override
public Long apply(Throwable throwable) {
System.out.println("step3 threadName(exceptionally): " + Thread.currentThread().getName());
System.out.println("执行失败!" + throwable.getMessage());
return null;
}
});

System.out.println("future最终结果: " + future.get());
}
}

handle()

方法有三,表示无论此阶段正常完成还是异常完成,都会执行fn回调。

回调是BiFunction,参数为 此阶段的结果此阶段抛出的异常,这两个参数都可以为null,该函数式接口的方法返回值。

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 返回一个新的 CompletionStage,当此阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数来执行该阶段。
*
* 当此阶段完成时,使用此阶段的结果(如果没有,则为 null)和异常(如果没有,则为 null)作为参数调用给定的函数,并使用函数的结果来完成返回的阶段。
*
* 参数: fn - 用于计算返回的 CompletionStage 值的函数
* 类型参数:<U> – 函数的返回类型
* 返回:新的 CompletionStage
*/
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); // 使用此阶段的默认异步执行工具执行
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor); // 使用提供的执行器 Executor 执行

与 whenComplete 方法区别:

  • whenComplete 方法的回调没有返回值,handle 方法的回调有返回值。
  • whenComplete 方法返回的新阶段将保留此阶段的结果或异常,而 handle 方法返回的新阶段的泛型值是回调方法 fn 的返回值。也就是说 handle 方法回调函数的返回值会影响最终的计算结果。

应用场景:

  • 若此阶段正常完成,加工结果。
  • 若此阶段异常完成,返回一个默认值并回滚。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Test {
public static void main(String[] args) {
try {
System.out.println(Thread.currentThread().getName());
whenComplete();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);
}

public static void whenComplete() throws ExecutionException, InterruptedException {
Executor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5, true));

CompletableFuture<Long> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("step1 threadName: " + Thread.currentThread().getName());
int i = 10 / 0;
return System.currentTimeMillis();
}, executor)
.handleAsync(new BiFunction<Long, Throwable, Long>() {
@Override
public Long apply(Long aLong, Throwable throwable) {
System.out.println("step2 threadName(handleAsync): " + Thread.currentThread().getName());
if (throwable == null) {
return aLong;
} else {
System.out.println(throwable.getMessage());
return -1L;
}
}
}, executor);

System.out.println("future最终结果: " + future.get());
}
}

toCompletableFuture()

1
2
3
4
5
6
7
8
9
10
/**
* 返回一个 CompletableFuture 保持与此阶段相同的完成属性。
* 如果此阶段已经是 CompletableFuture,则此方法可能会返回此阶段本身。否则,此方法的调用可能等效于 thenApply(x -> x),但返回 CompletableFuture 类型的实例。
*
* 不选择与其他实现互操作的 CompletionStage 实现可能会抛出 UnsupportedOperationException。意思就是,实现此接口时可以直接抛出 UnsupportedOperationException (不支持的操作异常)
*
* 返回:CompletableFuture
* 抛出:UnsupportedOperationException – 如果此实现不与 CompletableFuture 互操作
*/
public CompletableFuture<T> toCompletableFuture();

CompletableFuture 类是这样实现的

1
2
3
public CompletableFuture<T> toCompletableFuture() {
return this;
}

CompletableFuture

简介

CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调,支持流式(Stream)的计算处理,使 Java 在处理多任务的协同工作时更加顺畅便利。

比如:结果的合并;对结果异步的处理;阶段1产生的结果可以直接作为阶段2的入参,参与阶段2的计算,以此类推。这些功能放到 Futrue 身上也能实现,但现成 API 更简洁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 可以明确完成的 {@link Future}(设置其值和状态),并且可以用作 {@link CompletionStage},支持在其完成时触发的依赖功能和操作。
*
** 当两个或更多线程尝试调用 complete、completeExceptionally、cancel 方法时,只有其中一个成功。
*
* 除了直接操纵状态和结果的相关方法外,CompletableFuture 还使用以下策略实现接口 CompletionStage:
* 1、对非异步方法的依赖完成提供的操作,可以由完成当前 CompletableFuture 的线程执行,也可以由完成方法的任何其他调用者执行。
* 2、所有没有显式 Executor 参数的异步方法都使用 ForkJoinPool.commonPool() 执行(除非它不支持至少两个并行级别,在这种情况下,会创建一个新线程来运行每个任务)。
* 为了简化监控、调试和跟踪,所有生成的异步任务都是标记接口 CompletableFuture.AsynchronousCompletionTask 的实例。
* 3、所有 CompletionStage 接口的方法都独立于其他公共方法实现,因此一个方法的行为不会受到子类中其他方法的覆盖的影响。
*
* CompletableFuture 还使用以下策略实现 Future:
* 1、与 FutureTask 类不同,此类在对 boolean cancel(boolean mayInterruptIfRunning) 的实现过程中没有用到 mayInterruptIfRunning 参数,
* 无法直接控制正在执行的线程,因此cancle被视为另一种形式的异常完成。方法 cancel 与 completeExceptionally(new CancellationException()) 具
* 有相同的效果。方法 isCompletedExceptionally 可用于确定 CompletableFuture 是否以任何异常方式完成。
* 2、如果出现带有 CompletionException 的异常完成,get() 和 get(long, TimeUnit) 方法会抛出 ExecutionException,并且异常原因与对应
* 的 CompletionException 相同。为了在大多数情况下简化使用,此类还定义了 join() 和 getNow 方法,它们在这些情况下直接抛出 CompletionException。
*/
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}

如果任务很耗时,记得传 Executor,或者方法末尾加上 future.get(),因为 CompletableFuture 默认使用 ForkJoinPool,而 ForkJoinPool 里面的线程都是daemon线程,主线程跑完就 over 了。

静态方法

创建异步操作的四个静态方法:

1
2
3
4
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

前两个回调方法是 Runnable,无返回值。

后两个回调方法是 Supplier,有返回值。

入参有一个线程池 Executor,如果不指定就使用默认的线程池 {@link ForkJoinPool#commonPool()} 。如果机器是单核的,则默认使用 ThreadPerTaskExecutor,该类是一个内部类,每次执行execute都会创建一个新线程。详情请查看 CompletableFuture 源码。

创建一个没有任何操作的 CompletableFuture 对象:

1
2
3
4
5
6
7
8
/**
* 返回一个新的 CompletableFuture,它已经用给定的值完成了。
*
* 参数:value - 值
* 类型参数:<U> - 值的类型
* 返回:完成的 CompletableFuture
*/
public static <U> CompletableFuture<U> completedFuture(U value)

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 无返回值
*
* @return void
* @throws ExecutionException,InterruptedException
*/
public static void runAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("run end ...");
});
Void voidResult = future.get();
System.out.println("voidResult = " + voidResult); // 由于没有返回值,这里输出null
}

/**
* 有返回值
*
* @return void
* @throws ExecutionException,InterruptedException
*/
public static void supplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("run end ...");
return System.currentTimeMillis();
});
long completeTime = future.get();
System.out.println("completeTime = " + completeTime);
}

主动完成

complete()

1
2
3
4
5
6
7
/**
* 如果尚未完成,则将 get() 和相关方法返回的值设置为给定值。
*
* 参数:value - 结果值
* 返回:如果此调用导致此 CompletableFuture 转换为已完成状态,则为true,否则为false。
*/
public boolean complete(T value)

为什么叫CompletableFuture?CompletableFuture字面翻译过来,就是“可完成的Future”。同传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回。

completeExceptionally()

1
2
3
4
5
6
7
/**
* 如果尚未完成,则导致调用 get() 和相关方法抛出给定的异常。
*
* 参数:ex – 异常
* 返回: 如果此调用导致此 CompletableFuture 转换为已完成状态,则返回 true,否则返回 false
*/
public boolean completeExceptionally(Throwable ex)

isCompletedExceptionally()

1
2
3
4
5
6
/**
* 如果此CompletableFuture以任何方式异常完成,则返回true。可能的原因包括取消、显式调用完成异常以及突然终止完成阶段操作。
*
* 返回:如果此CompletableFuture异常完成,则返回true
*/
public boolean isCompletedExceptionally()

示例

complete() completeExceptionally() 只能调用一次,后续调用将被忽略。若想覆盖 future 之前的值请小心使用 CompletableFuture.obtrudeValue()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Test {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});

future.complete(100);
future.completeExceptionally(new Exception("手动异常完成"));

System.out.println(future.isCompletedExceptionally());
System.out.println(future.join());
}
}

主动完成-强加值

obtrudeValue()

1
2
3
4
5
6
7
/**
* 强制设置或重置方法 get() 和相关方法随后返回的值,无论是否已经完成。此方法仅用于错误恢复操作,即使在这种情况下,也可能导致使用已建立与
* 覆盖结果的持续依赖完成。
*
* 参数:value - 完成值
*/
public void obtrudeValue(T value)

obtrudeException()

1
2
3
4
5
6
7
8
/**
* 强制导致方法 get() 和相关方法的后续调用抛出给定异常,无论是否已经完成。此方法仅用于错误恢复操作,即使在这种情况下,也可能导致使用已建立与
* 覆盖结果的持续依赖完成。
*
* 参数:ex – 异常
* 抛出: NullPointerException – 如果异常为 null
*/
public void obtrudeException(Throwable ex)

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Test {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});

future.obtrudeValue(100);
future.obtrudeValue(200);
// future.obtrudeException(new Exception("强加手动异常完成1"));
// future.obtrudeException(new Exception("强加手动异常完成2"));

System.out.println(future.isCompletedExceptionally());
System.out.println(future.join());
}
}

getNow()

1
2
3
4
5
6
7
8
9
/**
* 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
*
* 参数: valueIfAbsent - 如果未完成则返回的值
* 返回:结果值,如果完成,否则给定值IfAbsent
* 抛出:CancellationException – 如果计算被取消
* CompletionException – 如果这个未来异常完成或完成计算发生异常
*/
public T getNow(T valueIfAbsent)

join()

1
2
3
4
5
6
7
8
9
/**
* 完成时返回结果值,如果异常完成则抛出(未经检查的)异常。为了更好地符合通用函数形式的使用,如果此CompletableFuture在执行中发生计算异常,
* 则此方法将抛出一个(未经检查的)CompletionException,并将底层异常作为其原因。
*
* 返回:结果值
* 抛出:CancellationException – 如果计算被取消
* CompletionException – 如果这个future异常完成或完成计算发生异常
*/
public T join()

最简单的方式就是通过构造函数创建一个CompletableFuture实例。如下代码所示。由于新创建的CompletableFuture还没有任何计算结果,这时调用join,当前线程会一直阻塞在这里。

1
2
3
CompletableFuture<String> future = new CompletableFuture<>();
String result = future.join();
System.out.println(result);

此时,如果在另外一个线程中,主动设置该CompletableFuture的值,则上面线程中的结果就能返回。

1
future.complete("test");

Future.get() 的区别:

1
V get() throws InterruptedException, ExecutionException;

get() 方法上声明了异常,所以必须捕获。

join() 方法上未声明异常,不用捕获,但可能抛出异常。

两者抛出的异常不同。

allOf()-与

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture。
* 如果任意给定的 CompletableFuture 异常完成,则返回的 CompletableFuture 也异常完成,并将此异常作为其原因。
* 如果所有给定的 CompletableFuture 正常完成,它们的结果(如果有)不会反映在返回的 CompletableFuture 中,但可以通过单独检查它们来获得。
* 如果没有提供 cfs,则返回一个 CompletableFuture 完成且值为 null。
*
* 该方法的应用之一是在继续程序之前等待一组独立的 CompletableFuture 完成,如:CompletableFuture.allOf(c1, c2, c3).join();。
*
* 参数:cfs – CompletableFutures
* 返回:当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
* 抛出: NullPointerException – 如果数组或其任何元素为空
*/
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

allOf() 的返回值是 CompletableFuture<Void> 类型,这是因为每个传入的 CompletableFuture 的返回值都可能不同,所以组合的结果无法用某种类型来表示,索性返回 Void 类型。那么,如何获取每个 CompletableFuture 的执行结果呢?

因为 allof() 没有返回值,所以通过 theApply(),给 allFutures 附上一个回调函数,在回调函数里面,调用每一个 Future 的 get()/join() 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Test {
public static void main(String[] args) {
Executor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5, true));

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return 1;
}, executor);

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return "string";
}, executor);

CompletableFuture<Boolean> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return true;
}, executor);

CompletableFuture.allOf(future1, future2, future3).thenApply((res) -> {
System.out.println(Thread.currentThread().getName());
System.out.println("future1结果: " + future1.join());
System.out.println("future2结果: " + future2.join());
System.out.println("future3结果: " + future3.join());
return "结束";
});

System.exit(0);
}
}

anyOf()-或

1
2
3
4
5
6
7
8
/**
* 当任意给定的 CompletableFuture 完成时(包括正常完成和异常完成),返回一个和它结果或异常相同的新的 CompletableFuture。
*
* 参数:cfs – CompletableFutures
* 返回:一个新的CompletableFuture,当任意一个完成时,它与任何给定CompletableFuture的结果或异常一起完成
* 抛出: NullPointerException – 如果数组或其任何元素为空
*/
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

任意一个 CompletableFuture 结束,就返回一个和它结果或异常相同的新的 CompletableFuture。

由于每个 CompletableFuture 的返回值类型都可能不同,无法判断最终返回类型,所以 anyOf 的返回值是 CompletableFuture<Object> 类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Test {
public static void main(String[] args) {
Executor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5, true));

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
// try {
// TimeUnit.SECONDS.sleep(3);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
return 1;
}, executor);

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return "string";
}, executor);

CompletableFuture<Boolean> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return true;
}, executor);

CompletableFuture<Object> resultFuture = CompletableFuture.anyOf(future1, future2, future3);
resultFuture.thenApply((res) -> {
System.out.println(Thread.currentThread().getName());
System.out.println(res.getClass() + " 结果: " + res);
return "结束";
});

System.exit(0);
}
}

getNumberOfDependents()

1
2
3
4
5
6
/**
* 返回其完成正在等待此 CompletableFuture 完成的 CompletableFuture 的估计数量。此方法设计用于监视系统状态,而不是用于同步控制。
*
* 返回:依赖的 CompletableFuture 的数量
*/
public int getNumberOfDependents()