java异步编程

  1. 1. 异步编程
  2. 2. springboot中的线程池创建
    1. 2.1. 引入依赖
    2. 2.2. 参数配置类
    3. 2.3. 创建自定义线程池bean
  3. 3. 创建异步任务
    1. 3.1. runAsync 方法
    2. 3.2. supplyAsync方法
  4. 4. 异步任务回调
    1. 4.1. thenRun、thenRunAsync方法
    2. 4.2. thenAccept、thenAcceptAsync方法
    3. 4.3. thenApply、thenApplyAsync
    4. 4.4. exceptionally
    5. 4.5. whenComplete方法
    6. 4.6. handle方法
  5. 5. 多任务组合处理
    1. 5.1. 两个任务必须都完成
      1. 5.1.1. runAfterBoth、runAfterBothAsync方法
      2. 5.1.2. thenAcceptBoth、thenAcceptBothAsync方法
      3. 5.1.3. thenCombine、thenCombineAsync方法
    2. 5.2. 两个任务中任意一个完成
    3. 5.3. 多个任务都完成
    4. 5.4. 多个任务任意一个完成
  6. 6. 组合任务处理
    1. 6.1. thenCompose方法
    2. 6.2. 与thenApply的区别
  7. 7. get、join方法
  8. 8. 异常处理

CompletableFuture类使用

异步编程

异步编程相比多线程,更加强调在主任务中开启一条线程去执行另一个任务。得益于jdk8新增的CompletableFuture类,大幅度简化了异步编程的操作,简化了之前使用runable(无法获取返回结果)、Future(可以获取返回结果)类使用的操作。

注意:后文中的关于CompletableFuture的执行方法是否有返回值,特指执行异步的异步任务是否有返回值,具体体现在返回值CompletableFuture<T> 中的T是否是void,若无返回值,则时void,否则是具体的返回值类型。

CompletableFuture需要使用线程池,例如

1
2
3
CompletableFuture<Void> runHello = CompletableFuture.runAsync(() -> {
System.out.println("hello runAsync");
}, threadPool);

CompletableFuture大多数方法同runAsync一致,需要传入两个参数,第一个是需要异步运行的任务,第二个threadPool是使用的线程池,如果不传第二个参数,则会使用默认的forkjoinpool(不建议使用)。在springboot项目中,我们的线程池一般都是创建配置类,生成配置的bean,需要使用时注入线程池的bean即可。

springboot中的线程池创建

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>

参数配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
* 线程池参数配置类
*/
@ConfigurationProperties(prefix = "microServer.thread") // 取配置文件中的参数
@Component // 如果不加入容器,其他类取ThreadPoolConfigProperties类中的值需要加入额外注解
@Data
public class ThreadPoolConfigProperties {

private Integer coreSize;

private Integer maxSize;

private Integer keepAliveTime;
}

创建自定义线程池bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 线程池配置
*/
// @EnableConfigurationProperties(ThreadPoolConfigProperties.class) // (ThreadPoolConfigProperties)配置类不写component 加入容器需要加入
@Configuration
public class MyThreadConfig {

/**
* 创建并返回线程池bean对象加入到容器
*/
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
return new ThreadPoolExecutor(pool.getCoreSize(), pool.getMaxSize(), pool.getKeepAliveTime(),
TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
}

}

创建异步任务

runAsync 方法

runAsync方法用于开启一个异步任务,此异步任务没有返回值。

1
2
3
CompletableFuture<Void> runHello = CompletableFuture.runAsync(() -> {
System.out.println("hello runAsync");
}, threadPool);

supplyAsync方法

supplyAsync方法用于开启一个异步任务,此异步任务有返回值。

1
2
3
4
5
CompletableFuture<String> suphello = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply");
// 异步任务的返回值
return "supply";
}, threadPool);

异步任务回调

注意:若回调链的某一个任务出现了异常,会导致后续的任务不执行。

thenRun、thenRunAsync方法

此两个方法用于在前一个任务执行完毕之后,再接着执行一个任务,且

  1. 链式调用的前面的方法报错时,此方法不会执行
  2. 不需要前一个任务的返回值
  3. 此任务也无返回值
  4. thenRun与thenRunAsync的区别是,thenRun使用前一个任务的线程池,thenRunAsync需要重新指定一个线程池,如果不指定,则为默认的forkjoinpool线程池(不建议)。后续的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是如此
1
2
3
4
5
6
7
8
9
10
11
12
CompletableFuture<Void> runHello = CompletableFuture.runAsync(() -> {
System.out.println("hello runAsync");
}, threadPool).thenRun(() -> {
System.out.println("hello thenRun");
});

CompletableFuture<Void> runHello2 = CompletableFuture.runAsync(() -> {
System.out.println("hello runAsync");
}, threadPool).thenRunAsync(() -> {
System.out.println("hello thenRunAsync");
// 需要重新指定线程池
}, threadPool);

thenAccept、thenAcceptAsync方法

此两个方法用于在前一个任务执行完毕之后,再接着执行一个任务。且

  1. 链式调用的前面的方法报错时,此方法不会执行
  2. 接收前一个任务的返回值作为入参
  3. 此任务没有返回值
1
2
3
4
5
6
CompletableFuture<Void> acphello = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply");
return "1";
}, threadPool).thenAccept(res -> {
System.out.println("hello thenAccept");
});

thenApply、thenApplyAsync

此两个方法用于在前一个任务执行完毕之后,再接着执行一个任务。且

  1. 接收前一个任务的返回值作为入参,若前一个任务没有返回值,则为null(下同)
  2. 此任务有返回值
1
2
3
4
5
6
7
8
9
CompletableFuture<String> apphello = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply");
return "1";
}, threadPool).thenApplyAsync(res -> {
System.out.println("hello apphello");
return "2";
});
// a = "2"
String a = apphello.get();

exceptionally

若某个任务出现异常后,捕获异常信息用于用户后续处理(回调),且

  1. 链式调用的前面的方法报错时,此方法才会执行
  2. 异常信息作为入参,
  3. 有返回值,且返回值类型必须和调用链上的最后一个正常的返回类型类型一致。
  4. 注意,此方法会消费在此方法调用链之前的异常,消费后只要此方法里、后续链式调用方法不再有异常,最终调用get方法获取返回值时不会报错
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CompletableFuture<String> apphello = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply");
// 异常代码
int a = 1/ 0;
return "1";
}, threadPool).thenApplyAsync(res -> {
// 由于调用链上的前一个任务出现了异常,此任务不会执行
System.out.println("hello apphello");
return "2";
}).exceptionally(e -> {
System.out.println("异常:" + e);
// 返回类型要与调用链的最后一个返回类型保持一致
return "3";
});

// 若exceptionally方法之前没有报错,则a的值是2,否则为exceptionally方法返回的值(此处为3)
// 若没有调用exceptionally方法消费异常,此get方法会报错
String a = apphello.get();

whenComplete方法

一般用于前一个异步任务完成后的后续处理(回调),且

  1. 无论链式调用前面的方法是否执行成功,此方法都会执行。
  2. 接收两个参数,前一个任务的返回值作为入参,链式调用上的异常作为入参。
  3. 若异常被exceptionally消费了,则此处获取的异常入参会变成null。
  4. 此方法不影响异步的返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
CompletableFuture<String> apphello2 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply");
// 异常代码
int a = 1/ 0;
return "1";
}, threadPool).thenApplyAsync(res -> {
// 不会执行
System.out.println("hello apphello");
return "2";
}).whenComplete((res, exception) -> {
// 此处依然会正常调用,其中入参res(返回值)会变成null,exception为具体的异常
// 注意:此方法无法消费异常,后续的get方法依旧会报错。
System.out.println("complete");
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CompletableFuture<String> apphello2 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply");
// 异常代码
int a = 1/ 0;
return "1";
}, threadPool).thenApplyAsync(res -> {
// 不会执行
System.out.println("hello apphello");
return "2";
}).exceptionally(e -> {
// 将异常消费掉
System.out.println("异常:" + e);
// 返回"3"
return "3";
}).whenComplete((res, exception) -> {
// 入参res是返回值("3"),异常被消费掉后,exception为null
System.out.println("complete");
});

// 无异常报错(异常已被exceptionally消费掉了),a = "3"
String a = apphello2.get();

handle方法

一般用于前一个异步任务完成后的后续处理(回调),且

  1. 无论链式调用前面的方法是否执行成功,此方法都会执行。
  2. 接收两个参数,前一个任务的返回值作为入参,链式调用上的异常作为入参。
  3. 若异常被exceptionally消费了,则此处获取的异常入参会变成null。
  4. 此方法有的返回值,且影响异步返回值。
  5. 此方法会消费掉异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CompletableFuture<String> apphello3 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply");
// 异常代码
int a = 1/ 0;
return "1";
}, threadPool).thenApplyAsync(res -> {
// 不会执行
System.out.println("hello apphello");
return "2";
}).handle((res, exception) -> {
// 会消费掉异常,且有返回值
System.out.println("complete");
return "4";
});

// 异常被handle消费掉了,不会报错,a = "4"
String a = apphello3.get();

多任务组合处理

两个任务必须都完成

runAfterBoth、runAfterBothAsync方法

两个异步任务都执行成功后,才执行。且

  1. 两个异步任务的执行不会相互影响(其中一个报错,另一个依旧会执行)。
  2. 若有一个异步任务抛出了异常,则此方法不会执行
  3. 不接受前面两个方法的入参,且没有返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 异步任务1
CompletableFuture<String> appHello1 = CompletableFuture.supplyAsync(() -> {
// 异常代码
//int a = 1/ 0;
System.out.println("hello supply1");
return "1";
}, threadPool);

// 异步任务2
CompletableFuture<String> appHello2 = CompletableFuture.supplyAsync(() -> {
// 异常代码
System.out.println("hello supply2");
return "2";
}, threadPool);

// 通过 任务1future.runAfterBoth(任务2future, ->{想要执行的任务}) 调用
// 表示在任务1、任务2都正常执行成功的情况下,再执行想要执行的任务。
CompletableFuture<Void> future = appHello1.runAfterBoth(appHello2, () -> {
System.out.println("runAfterBoth");
});

thenAcceptBoth、thenAcceptBothAsync方法

两个异步任务都执行成功后,才执行。且

  1. 两个异步任务的执行不会相互影响(其中一个报错,另一个依旧会执行)。
  2. 若有一个异步任务抛出了异常,则此方法不会执行
  3. 接受前面两个方法的入参,但是没有返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 异步任务1
CompletableFuture<String> appHello1 = CompletableFuture.supplyAsync(() -> {
// 异常代码
//int a = 1/ 0;
System.out.println("hello supply1");
return "1";
}, threadPool);

// 异步任务2
CompletableFuture<String> appHello2 = CompletableFuture.supplyAsync(() -> {
// 异常代码
System.out.println("hello supply2");
return "2";
}, threadPool);

// 通过 任务1future.thenAcceptBoth(任务2future, ->{想要执行的任务}) 调用,接收两个入参,分别为两个任务的返回值
// 表示在任务1、任务2都正常执行成功的情况下,再执行想要执行的任务。
CompletableFuture<Void> future = appHello1.thenAcceptBoth(appHello2, (a, b) -> {
System.out.println("thenAcceptBoth");
// a为appHello1的返回值,此处 a = "1"
System.out.println(a);
// b为appHello2的返回值,此处 b = "2"
System.out.println(b);
});

thenCombine、thenCombineAsync方法

两个异步任务都执行成功后,才执行。且

  1. 两个异步任务的执行不会相互影响(其中一个报错,另一个依旧会执行)。
  2. 若有一个异步任务抛出了异常,则此方法不会执行
  3. 接受前面两个方法的入参,但是有返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 异步任务1
CompletableFuture<String> appHello1 = CompletableFuture.supplyAsync(() -> {
// 异常代码
//int a = 1/ 0;
System.out.println("hello supply1");
return "1";
}, threadPool);

// 异步任务2
CompletableFuture<String> appHello2 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply2");
return "2";
}, threadPool);

// 通过 任务1future.thenCombine(任务2future, ->{想要执行的任务}) 调用,接收两个入参,分别为两个任务的返回值
// 表示在任务1、任务2都正常执行成功的情况下,再执行想要执行的任务。
CompletableFuture<String> thenCombine = appHello1.thenCombine(appHello2, (a, b) -> {
System.out.println("thenCombine");
// a为appHello1的返回值,此处 a = "1"
System.out.println(a);
// b为appHello2的返回值,此处 b = "2"
System.out.println(b);
return "3";
});

// 为thenCombine的返回值,此处 a = "3"。
String a = thenCombine.get();

两个任务中任意一个完成

与都完成类似,分别有三组、六个方法(每组带Async、不带Async两个)。代码只给出applyToEither例子。

  1. runAfterEither: 不会把执行结果当做方法入参,且没有返回值。
  2. acceptEither: 会将最先执行完任务的返回值,作为方法入参,传递到指定方法中,且无返回值、
  3. applyToEither:会将最先执行完任务的返回值,作为方法入参,传递到指定方法中,且有返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 此方法执行出现异常,但是在appHello4执行完毕后才会出现异常
CompletableFuture<String> appHello3 = CompletableFuture.supplyAsync(() -> {
try {
// 模拟执行用时,让appHello3方法耗时大于appHello4方法
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 异常代码
int a = 1/ 0;
System.out.println("hello supply3");
return "1";
}, threadPool);

// 此异步方法执行正常,且耗时更短
CompletableFuture<String> appHello4 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply4");
return "2";
}, threadPool);

// 在两个方法任意一个执行完毕后,接收一个返回值作为入参a,谁先执行完毕,就是谁的返回值。
// 假设是appHello3先执行完毕,且appHello3抛出了异常,此方法不会执行,且最后的get会报错
// 现在的代码,appHello3报错了,但是报错之前,appHello4已经执行完毕且返回了值,因此applyToEither会执行,且后get方法不会报错
CompletableFuture<String> applyToEither = appHello3.applyToEither(appHello4, a -> {
System.out.println(a);
return "3";
});

// 虽然appHello3报错了,但是在其报错之前appHello4就已执行成功且返回了值。
// applyToEither正常执行,且没有报错,此处a = "3"
String a = applyToEither.get();

多个任务都完成

allOf方法,此方法没有返回值,因此有些需要返回值的调用没有办法使用,(一般也不会这样用)。

  1. 多个异步任务的执行不会相互影响(其中一个报错,另一个依旧会执行)。
  2. 若有一个异步任务抛出了异常,则此方法不会执行
  3. allOf方法没有返回值,因此,接着调用某些需要返回值的方法可能会报错,例如handle方法。

用法:接着调用whenComplete方法,在多个异步任务都执行完毕后执行(异步执行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CompletableFuture<String> appHello3 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply3");
return "1";
}, threadPool);

CompletableFuture<String> appHello4 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply4");
return "2";
}, threadPool);

// 调用whenComplete方法,在两个任务都执行完毕后执行
CompletableFuture.allOf(appHello3, appHello4).whenComplete((res, exception) -> {
System.out.println("4");
});

用法:使用join方法,在多个任务都执行完毕后,在主线程中执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CompletableFuture<String> appHello3 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply3");
return "1";
}, threadPool);

CompletableFuture<String> appHello4 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply4");
return "2";
}, threadPool);

CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(appHello3, appHello4);

try {
// 在主线程中等待多个allOf的异步任务都执行完毕(阻塞主线程)
allOfFuture.join();
// do something
} catch (Exception ex) {
// when wrong, do something
}

多个任务任意一个完成

anyOf方法,相比于allOf方法,此方法有返回值,类型未Object,同两个任务任意一个完成一样,会接收最先完成的任务的返回值,其他的都会被丢弃掉,即使出现异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 此方法执行出现异常,但是在appHello4执行完毕后才会出现异常
CompletableFuture<String> appHello3 = CompletableFuture.supplyAsync(() -> {
// 异常代码
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
int a = 1/ 0;
System.out.println("hello supply3");
return "1";
}, threadPool);

// 此异步方法执行正常,且耗时更短
CompletableFuture<String> appHello4 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply4");
return "2";
}, threadPool);

// 先返回的appHello4没有报错,此anyOfFuture的后续调用也不会出问题,返回值为appHello4的返回值。类型为Object
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(appHello3, appHello4);
// 有返回值,可以正常调用handle方法
anyOfFuture.handle((res, e) -> {
System.out.println(res);
System.out.println(e.getMessage());
return "4";
});

组合任务处理

thenCompose方法

用于组合两个CompletableFuture方法,并将前一个CompletableFuture方法的返回值作为入参传入下一个CompletableFuture。有先后依赖关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
CompletableFuture<String> appHello5 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello supply5");
return "1";
}, threadPool);

CompletableFuture<String> future1 = appHello5.thenCompose((res -> CompletableFuture.supplyAsync(() -> {
// res是appHello5的返回值,这里是"1"
System.out.println(res + ":" + "hello supply6");
return "2";
}, threadPool)));

// a = "1:hello supply6"
String a = future1.get();

与thenApply的区别

参考自:java - CompletableFuture | thenApply vs thenCompose - Stack Overflow

supplyAsync类似于stream中的map方法,thenCompose类似有stream的flatMap方法(将结果展平)。

  • 如果异步调用方法,返回了一个CompletableFuture<T>,则应该使用thenCompose,若使用supplyAsync,则会返回一个CompletableFuture<CompletableFuture<T>>对象。
  • 如果异步调用的方法,返回了一个T对象,则应该使用supplyAsync。最终会返回一个CompletableFuture<T>对象
1
2
3
4
5
6
7
8
// thenApply 的返回值,会将原本的返回值使用CompletableFuture包装起来,
// 例如thenApply 中的调用方法的原本返回的是x+1(一个Integer),然后会将其包装返回成CompletableFuture<Integer>
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1)
.thenApply(x -> x+1);

// thenCompose 中的调用方法的原本返回值,是一个CompletableFuture<Integer>,直接将其返回
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1)
.thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1));

get、join方法

get、join方法用于获取CompletableFuture中包装的具体的值的类型,若CompletableFuture中有异常,则会报错。

get方法会获取返回值,join不会。

注意:get方法会阻塞当前调用的”主线程“。建议使用get的时候,加一个超时时间。CompletableFuture线程池策略最好使用AbortPolicy

1
String result = future.get(5, TimeUnit.SECONDS);

异常处理

  • 使用 whenComplete 方法可以在任务完成时触发回调函数,并正确地处理异常,而不是让异常被吞噬或丢失。
  • 使用 exceptionally 方法可以处理异常并重新抛出,以便异常能够传播到后续阶段,而不是让异常被忽略或终止。
  • 使用 handle 方法可以处理正常的返回结果和异常,并返回一个新的结果,而不是让异常影响正常的业务逻辑。
  • 使用 CompletableFuture.allOf 方法可以组合多个 CompletableFuture,并统一处理所有任务的异常,而不是让异常处理过于冗长或重复。

实际使用中可以使用现成的异步框架,来实现异步编排。