CompletableFuture类使用
异步编程
异步编程相比多线程,更加强调在主任务中开启一条线程去执行另一个任务。得益于jdk8新增的CompletableFuture类,大幅度简化了异步编程的操作,简化了之前使用runable(无法获取返回结果)、Future(可以获取返回结果)类使用的操作。
注意:后文中的关于CompletableFuture的执行方法是否有返回值,特指执行异步的异步任务是否有返回值,具体体现在返回值CompletableFuture<T> 中的T是否是void,若无返回值,则时void,否则是具体的返回值类型。
CompletableFuture需要使用线程池,例如
1 2 3
| CompletableFuture<Void> runHello = CompletableFuture.runAsync(() -> { System.out.println("hello runAsync"); }, threadPool);
|
CompletableFuture大多数方法同runAsync一致,需要传入两个参数,第一个是需要异步运行的任务,第二个threadPool
是使用的线程池,如果不传第二个参数,则会使用默认的forkjoinpool(不建议使用)。在springboot项目中,我们的线程池一般都是创建配置类,生成配置的bean,需要使用时注入线程池的bean即可。
springboot中的线程池创建
引入依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
|
参数配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component;
@ConfigurationProperties(prefix = "microServer.thread") @Component @Data public class ThreadPoolConfigProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime; }
|
创建自定义线程池bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
@Configuration public class MyThreadConfig {
@Bean public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) { return new ThreadPoolExecutor(pool.getCoreSize(), pool.getMaxSize(), pool.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); }
}
|
创建异步任务
runAsync 方法
runAsync方法用于开启一个异步任务,此异步任务没有返回值。
1 2 3
| CompletableFuture<Void> runHello = CompletableFuture.runAsync(() -> { System.out.println("hello runAsync"); }, threadPool);
|
supplyAsync方法
supplyAsync方法用于开启一个异步任务,此异步任务有返回值。
1 2 3 4 5
| CompletableFuture<String> suphello = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply"); return "supply"; }, threadPool);
|
异步任务回调
注意:若回调链的某一个任务出现了异常,会导致后续的任务不执行。
thenRun、thenRunAsync方法
此两个方法用于在前一个任务执行完毕之后,再接着执行一个任务,且
- 链式调用的前面的方法报错时,此方法不会执行
- 不需要前一个任务的返回值
- 此任务也无返回值
- thenRun与thenRunAsync的区别是,thenRun使用前一个任务的线程池,thenRunAsync需要重新指定一个线程池,如果不指定,则为默认的forkjoinpool线程池(不建议)。后续的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是如此
1 2 3 4 5 6 7 8 9 10 11 12
| CompletableFuture<Void> runHello = CompletableFuture.runAsync(() -> { System.out.println("hello runAsync"); }, threadPool).thenRun(() -> { System.out.println("hello thenRun"); });
CompletableFuture<Void> runHello2 = CompletableFuture.runAsync(() -> { System.out.println("hello runAsync"); }, threadPool).thenRunAsync(() -> { System.out.println("hello thenRunAsync"); }, threadPool);
|
thenAccept、thenAcceptAsync方法
此两个方法用于在前一个任务执行完毕之后,再接着执行一个任务。且
- 链式调用的前面的方法报错时,此方法不会执行
- 接收前一个任务的返回值作为入参
- 此任务没有返回值
1 2 3 4 5 6
| CompletableFuture<Void> acphello = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply"); return "1"; }, threadPool).thenAccept(res -> { System.out.println("hello thenAccept"); });
|
thenApply、thenApplyAsync
此两个方法用于在前一个任务执行完毕之后,再接着执行一个任务。且
- 接收前一个任务的返回值作为入参,若前一个任务没有返回值,则为null(下同)
- 此任务有返回值
1 2 3 4 5 6 7 8 9
| CompletableFuture<String> apphello = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply"); return "1"; }, threadPool).thenApplyAsync(res -> { System.out.println("hello apphello"); return "2"; });
String a = apphello.get();
|
exceptionally
若某个任务出现异常后,捕获异常信息用于用户后续处理(回调),且
- 链式调用的前面的方法报错时,此方法才会执行
- 异常信息作为入参,
- 有返回值,且返回值类型必须和调用链上的最后一个正常的返回类型类型一致。
- 注意,此方法会消费在此方法调用链之前的异常,消费后只要此方法里、后续链式调用方法不再有异常,最终调用get方法获取返回值时不会报错
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| CompletableFuture<String> apphello = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply"); int a = 1/ 0; return "1"; }, threadPool).thenApplyAsync(res -> { System.out.println("hello apphello"); return "2"; }).exceptionally(e -> { System.out.println("异常:" + e); return "3"; });
String a = apphello.get();
|
whenComplete方法
一般用于前一个异步任务完成后的后续处理(回调),且
- 无论链式调用前面的方法是否执行成功,此方法都会执行。
- 接收两个参数,前一个任务的返回值作为入参,链式调用上的异常作为入参。
- 若异常被exceptionally消费了,则此处获取的异常入参会变成null。
- 此方法不影响异步的返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| CompletableFuture<String> apphello2 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply"); int a = 1/ 0; return "1"; }, threadPool).thenApplyAsync(res -> { System.out.println("hello apphello"); return "2"; }).whenComplete((res, exception) -> { System.out.println("complete"); });
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| CompletableFuture<String> apphello2 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply"); int a = 1/ 0; return "1"; }, threadPool).thenApplyAsync(res -> { System.out.println("hello apphello"); return "2"; }).exceptionally(e -> { System.out.println("异常:" + e); return "3"; }).whenComplete((res, exception) -> { System.out.println("complete"); });
String a = apphello2.get();
|
handle方法
一般用于前一个异步任务完成后的后续处理(回调),且
- 无论链式调用前面的方法是否执行成功,此方法都会执行。
- 接收两个参数,前一个任务的返回值作为入参,链式调用上的异常作为入参。
- 若异常被exceptionally消费了,则此处获取的异常入参会变成null。
- 此方法有的返回值,且影响异步返回值。
- 此方法会消费掉异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| CompletableFuture<String> apphello3 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply"); int a = 1/ 0; return "1"; }, threadPool).thenApplyAsync(res -> { System.out.println("hello apphello"); return "2"; }).handle((res, exception) -> { System.out.println("complete"); return "4"; });
String a = apphello3.get();
|
多任务组合处理
两个任务必须都完成
runAfterBoth、runAfterBothAsync方法
两个异步任务都执行成功后,才执行。且
- 两个异步任务的执行不会相互影响(其中一个报错,另一个依旧会执行)。
- 若有一个异步任务抛出了异常,则此方法不会执行
- 不接受前面两个方法的入参,且没有返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| CompletableFuture<String> appHello1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply1"); return "1"; }, threadPool);
CompletableFuture<String> appHello2 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply2"); return "2"; }, threadPool);
CompletableFuture<Void> future = appHello1.runAfterBoth(appHello2, () -> { System.out.println("runAfterBoth"); });
|
thenAcceptBoth、thenAcceptBothAsync方法
两个异步任务都执行成功后,才执行。且
- 两个异步任务的执行不会相互影响(其中一个报错,另一个依旧会执行)。
- 若有一个异步任务抛出了异常,则此方法不会执行
- 接受前面两个方法的入参,但是没有返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| CompletableFuture<String> appHello1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply1"); return "1"; }, threadPool);
CompletableFuture<String> appHello2 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply2"); return "2"; }, threadPool);
CompletableFuture<Void> future = appHello1.thenAcceptBoth(appHello2, (a, b) -> { System.out.println("thenAcceptBoth"); System.out.println(a); System.out.println(b); });
|
thenCombine、thenCombineAsync方法
两个异步任务都执行成功后,才执行。且
- 两个异步任务的执行不会相互影响(其中一个报错,另一个依旧会执行)。
- 若有一个异步任务抛出了异常,则此方法不会执行
- 接受前面两个方法的入参,但是有返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| CompletableFuture<String> appHello1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply1"); return "1"; }, threadPool);
CompletableFuture<String> appHello2 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply2"); return "2"; }, threadPool);
CompletableFuture<String> thenCombine = appHello1.thenCombine(appHello2, (a, b) -> { System.out.println("thenCombine"); System.out.println(a); System.out.println(b); return "3"; });
String a = thenCombine.get();
|
两个任务中任意一个完成
与都完成类似,分别有三组、六个方法(每组带Async、不带Async两个)。代码只给出applyToEither例子。
- runAfterEither: 不会把执行结果当做方法入参,且没有返回值。
- acceptEither: 会将最先执行完任务的返回值,作为方法入参,传递到指定方法中,且无返回值、
- applyToEither:会将最先执行完任务的返回值,作为方法入参,传递到指定方法中,且有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| CompletableFuture<String> appHello3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } int a = 1/ 0; System.out.println("hello supply3"); return "1"; }, threadPool);
CompletableFuture<String> appHello4 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply4"); return "2"; }, threadPool);
CompletableFuture<String> applyToEither = appHello3.applyToEither(appHello4, a -> { System.out.println(a); return "3"; });
String a = applyToEither.get();
|
多个任务都完成
allOf方法,此方法没有返回值,因此有些需要返回值的调用没有办法使用,(一般也不会这样用)。
- 多个异步任务的执行不会相互影响(其中一个报错,另一个依旧会执行)。
- 若有一个异步任务抛出了异常,则此方法不会执行
- allOf方法没有返回值,因此,接着调用某些需要返回值的方法可能会报错,例如handle方法。
用法:接着调用whenComplete方法,在多个异步任务都执行完毕后执行(异步执行)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| CompletableFuture<String> appHello3 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply3"); return "1"; }, threadPool);
CompletableFuture<String> appHello4 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply4"); return "2"; }, threadPool);
CompletableFuture.allOf(appHello3, appHello4).whenComplete((res, exception) -> { System.out.println("4"); });
|
用法:使用join方法,在多个任务都执行完毕后,在主线程中执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| CompletableFuture<String> appHello3 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply3"); return "1"; }, threadPool);
CompletableFuture<String> appHello4 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply4"); return "2"; }, threadPool);
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(appHello3, appHello4);
try { allOfFuture.join(); } catch (Exception ex) { }
|
多个任务任意一个完成
anyOf方法,相比于allOf方法,此方法有返回值,类型未Object,同两个任务任意一个完成一样,会接收最先完成的任务的返回值,其他的都会被丢弃掉,即使出现异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| CompletableFuture<String> appHello3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } int a = 1/ 0; System.out.println("hello supply3"); return "1"; }, threadPool);
CompletableFuture<String> appHello4 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply4"); return "2"; }, threadPool);
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(appHello3, appHello4);
anyOfFuture.handle((res, e) -> { System.out.println(res); System.out.println(e.getMessage()); return "4"; });
|
组合任务处理
thenCompose方法
用于组合两个CompletableFuture方法,并将前一个CompletableFuture方法的返回值作为入参传入下一个CompletableFuture。有先后依赖关系。
1 2 3 4 5 6 7 8 9 10 11 12 13
| CompletableFuture<String> appHello5 = CompletableFuture.supplyAsync(() -> { System.out.println("hello supply5"); return "1"; }, threadPool);
CompletableFuture<String> future1 = appHello5.thenCompose((res -> CompletableFuture.supplyAsync(() -> { System.out.println(res + ":" + "hello supply6"); return "2"; }, threadPool)));
String a = future1.get();
|
与thenApply的区别
参考自:java - CompletableFuture | thenApply vs thenCompose - Stack Overflow
supplyAsync类似于stream中的map方法,thenCompose类似有stream的flatMap方法(将结果展平)。
- 如果异步调用方法,返回了一个CompletableFuture<T>,则应该使用thenCompose,若使用supplyAsync,则会返回一个CompletableFuture<CompletableFuture<T>>对象。
- 如果异步调用的方法,返回了一个T对象,则应该使用supplyAsync。最终会返回一个CompletableFuture<T>对象
1 2 3 4 5 6 7 8
|
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1) .thenApply(x -> x+1);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1) .thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1));
|
get、join方法
get、join方法用于获取CompletableFuture中包装的具体的值的类型,若CompletableFuture中有异常,则会报错。
get方法会获取返回值,join不会。
注意:get方法会阻塞当前调用的”主线程“。建议使用get的时候,加一个超时时间。CompletableFuture线程池策略最好使用AbortPolicy
1
| String result = future.get(5, TimeUnit.SECONDS);
|
异常处理
- 使用
whenComplete
方法可以在任务完成时触发回调函数,并正确地处理异常,而不是让异常被吞噬或丢失。
- 使用
exceptionally
方法可以处理异常并重新抛出,以便异常能够传播到后续阶段,而不是让异常被忽略或终止。
- 使用
handle
方法可以处理正常的返回结果和异常,并返回一个新的结果,而不是让异常影响正常的业务逻辑。
- 使用
CompletableFuture.allOf
方法可以组合多个 CompletableFuture
,并统一处理所有任务的异常,而不是让异常处理过于冗长或重复。
实际使用中可以使用现成的异步框架,来实现异步编排。