Future和Callable接口

Future接口(FutureTask实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Future接口可以为主线程开一个异步线程,专门为主线程处理耗时和费力的复杂业务。


Callable接口中定义了需要有返回的任务需要实现的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
class Thread1 implements Callable{
@Override
public Object call() throws Exception {
return null;
}
}

class Thread2 implements Runnable{
@Override
public void run() {

}
}

与使用Runnable接口相比, Callable接口功能更强大些:

  • 方法可以有返回值

  • 方法可以抛出异常

  • 支持泛型的返回值

FutureTask类

基本介绍

现在我们需要有异步多线程执行任务且有返回结果,需要满足三个特点:多线程/有返回/异步任务

分析:

  • Thread类可以创建多线程,其构造方法可以传实现了Runnable接口的类
  • RunnableFuture接口既继承了Runnable接口,也继承了Future接口(异步任务
  • FutureTask类实现了RunnableFuture接口,而且其构造方法可以传入Callable接口(有返回值)或Runnable接口的实现类


实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new MyThread());
Thread thread = new Thread(futureTask, "thread");
thread.start();
System.out.println(futureTask.get());//接收返回值
}
}

class MyThread implements Callable {
@Override
public Object call() throws Exception {
System.out.println("-----come in call()");
return "返回值";
}
}

优缺点

FutureTask实现类的优点是异步多线程(Future)+线程池进行任务配合,能显著提高程序的执行效率。但也有缺点:

  • 一旦调用get()方法,不管是否计算完成都会导致阻塞(所以一般get方法放到最后)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("-----come in FutureTask");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task over";
});

Thread t1 = new Thread(futureTask, "t1");
t1.start();

//3秒钟后才出来结果,还没有计算你提前来拿(只要一调用get方法,对于结果就是不见不散,会导致阻塞)
//System.out.println(Thread.currentThread().getName()+"\t"+futureTask.get());

//3秒钟后才出来结果,我只想等待1秒钟,过时不候, 超时会抛出Timeout异常
System.out.println(Thread.currentThread().getName() + "\t" + futureTask.get(1L, TimeUnit.SECONDS));

System.out.println(Thread.currentThread().getName() + "\t" + " run... here");

}
}
  • 如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞,但轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("-----come in FutureTask");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task over";
});

new Thread(futureTask, "t1").start();

System.out.println(Thread.currentThread().getName() + "\t" + "线程完成任务");

/**
* 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果
*/
while (true) {
if (futureTask.isDone()) {
System.out.println(futureTask.get());
break;
}
}
}
}

CompletableFuture类

阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无谓的CPU资源。因此,JDK8 设计出CompletableFuture实现类。

基本介绍

CompletableFuture类:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

  • 在Java 8中, CompletableFuture 提供了非常强大的 Future 的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 也提供了转换和组合CompletableFuture 的方法。

  • 它可能代表一个明确完成的 Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些动作。

  • 它实现了 Future 和 CompletionStage 接口。


CompletionStage接口:

  • CompletionStage代表异步计算过程中的某一个阶段, 一个阶段完成以后可能会触发另外一个阶段

  • 一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:stage.thenApply(x->square(x) ) .then Accept(x->System.out.print(x) ) .then Run() ->System.out.print In() )

  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。

四大静态方法

  • runAsync 无返回值

  • supplyAsync 有返回值(Supplier供给型函数式接口,没有输入参数,需要返回值)

  • 没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码

  • 如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}

public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}

runAsync case:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----task is over");
//默认的线程池:ForkJoinPool.commonPool-worker-9
// });
}, Executors.newFixedThreadPool(3));//指定的线程池:pool-1-thread-1
System.out.println(future.get());
}
}

supplyAsync case:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task over";
//默认的线程池:ForkJoinPool.commonPool-worker-9
// });
}, Executors.newFixedThreadPool(3));//指定的线程池:pool-1-thread-1

System.out.println(completableFuture.get());
}
}

通用异步编程

  • CompletableFuture 通过whenComplete减少阻塞和轮询(自动回调)
  • 主线程结束后,CompletableFuture 默认使用的线程池会立即关闭,我们可以使用自定义的线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
try {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "--------副线程come in");
int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----结果---异常判断值---" + result);
//---------------------异常情况的演示--------------------------------------
if (result > 2) {
int i = 10 / 0;//我们主动的给一个异常情况
}
//------------------------------------------------------------------
return result;
}, threadPool).whenComplete((v, e) -> {//没有异常,v是值,e是异常
if (e == null) {
System.out.println("------------------计算完成,更新系统updateValue" + v);
}
}).exceptionally(e -> {//有异常的情况
e.printStackTrace();
System.out.println("异常情况" + e.getCause() + "\t" + e.getMessage());
return null;
});

//线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}

join和get区别

  • 功能几乎一样,区别在于编码时是否需要抛出异常
    • get()方法需要抛出异常
    • join()方法不需要抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {//抛出异常
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
return "hello 12345";
});
System.out.println(completableFuture.get());
}
}

public class Main {
public static void main(String[] args){//不抛出异常
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
return "hello 12345";
});
System.out.println(completableFuture.join());
}
}

优点

  • 异步任务结束或者出错时,会自动回调某个对象的方法;
  • 主线程设置好毁掉后,不再关心异步任务的执行,异步任务之间可以顺序执行。

函数式接口

定义:任何接口,如果只包含唯一一个抽象方法,那么它就是一个函数式接口。对于函数式接口,我们可以通过lambda表达式来创建该接口的对象。

常见函数式接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@FunctionalInterface
public interface Runnable {
public abstract void run();
}

@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
}

@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}

@FunctionalInterface
public interface Supplier<T> {
T get();
}

@FunctionalInterface
public interface BiConsumer<T, U> {
void accept(T t, U u);
}

总结:

函数式接口名称 方法名称 参数 返回值
Runnable run 无参数 无返回值
Function apply 1个参数 有返回值
Consume accept 1个参数 无返回值
Supplier get 没有参数 有返回值
Biconsumer accept 2个参数 无返回值

电商比价案例

经常出现在等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 本身是并无关系的,可以同时进行执行的。我们希望能够两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。

同理,对于分布式微服务的调用,按照实际业务,如果是无关联step by step的业务,可以尝试是否可以多箭齐发,同时调用。


案例需求需求说明:
同一款产品,同时搜索出该款产品在各大电商平台的售价;

输出返回:
出来结果希望是同款产品的在不同地方的价格清单列表, 返回一个List<String>
《mysql》in jd price is 88.05
《mysql》in dang dang price is 86.11
《mysql》in tao bao price is 90.43

解决方案:

  1. stepbystep, 按部就班, 查完京东查淘宝, 查完淘宝查天猫
  2. all in,万箭齐发,一口气多线程异步任务同时查询

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class Main {
static List<NetMall> list = Arrays.asList(
new NetMall("jd"),
new NetMall("tmall"),
new NetMall("pdd"),
new NetMall("taobao")
);

public static List<String> findPriceSync(List<NetMall> list, String productName) {
return list.stream().map(mall -> String.format(productName + " %s price is %.2f",
mall.getNetMallName(),
mall.getPriceByName(productName)))
.collect(Collectors.toList());
}

public static List<String> findPriceASync(List<NetMall> list, String productName) {
return list.stream().map(mall -> CompletableFuture.supplyAsync(() -> String.format(productName + " %s price is %.2f",
mall.getNetMallName(),
mall.getPriceByName(productName))))
.collect(Collectors.toList())//和下面那行都不能省略,不然会串行调用join
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}


public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> list1 = findPriceSync(list, "thinking in java");
for (String element : list1) {
System.out.println(element);
}
long endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");

long startTime2 = System.currentTimeMillis();
List<String> list2 = findPriceASync(list, "thinking in java");
for (String element : list2) {
System.out.println(element);
}
long endTime2 = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime2 - startTime2) + " 毫秒");
}
}

class NetMall {
private String netMallName;

public String getNetMallName() {
return netMallName;
}

public NetMall(String netMallName) {
this.netMallName = netMallName;
}

public double getPriceByName(String productName) {
return calcPrice(productName);
}

private double calcPrice(String productName) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ThreadLocalRandom.current().nextDouble() + productName.charAt(0);
}
}

CompletableFuture常用方法

获得结果和触发计算

获取结果:

  • public T get(), 不见不散,容易阻塞
  • public T get(long timeout,TimeUnit unit),过时不候,超时会抛出异常
  • public T join(), 类似于get(),区别在于是否需要抛出异常
  • public T getNow(T valueIfAbsent),立即获取结果不阻塞:计算完,返回计算完成后的结果;没算完,返回设定的valueAbsent

主动触发计算:

  • public boolean complete(T value),是否立即打断get()方法返回括号值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);//执行需要2秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task over";
});

try {
TimeUnit.SECONDS.sleep(1);//等待需要1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
// System.out.println(completableFuture.getNow("replacement value"));//执行时间超过等待时间,返回括号里的值

//执行时间超过等待时间,返回true + 括号里的值
//执行时间小于等待时间,返回false + completableFuture的返回值
System.out.println(completableFuture.complete("completeValue") + "\t" + completableFuture.get());

}
}

对计算结果进行处理

  • thenApply 计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("111");
return 1024;
}).thenApply(f -> {
System.out.println("222");
return f + 1;
}).thenApply(f -> {
//int age = 10/0; // 异常情况:那步出错就停在那步。
System.out.println("333");
return f + 1;
}).whenCompleteAsync((v, e) -> {
System.out.println("*****v: " + v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});

System.out.println("-----主线程结束,END");

// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
  • handle 类似于thenApply,但是有异常的话仍然可以往下走一步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Main {
public static void main(String[] args) {
// 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("111");
return 1024;
}).handle((f, e) -> {
int i = 10 / 0;
System.out.println("222");
return f + 1;
}).handle((f, e) -> {
System.out.println("333");
return f + 1;
}).whenCompleteAsync((v, e) -> {
System.out.println("*****v: " + v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});

System.out.println("-----主线程结束,END");

// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

执行结果:

—–主线程结束,END
111
333
*****v: null
java.util.concurrent.CompletionException: java.lang.NullPointerException


对计算结果进行消费

  • thenAccept接收任务的处理结果,并消费处理,无返回结果
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(f -> {
return f + 2;
}).thenApply(f -> {
return f + 3;
}).thenApply(f -> {
return f + 4;
}).thenAccept(System.out::println);
}
}

补充:Code之任务之间的顺序执行:

  • thenRun(Runnable runnable):任务A执行完执行B,并且B不需要A的结果

  • thenAccept(Consumer action):任务A执行完执行B,B需要A的结果,但是任务B无返回值

  • thenApply(Function fn):任务A执行完执行B,B需要A的结果,同时任务B有返回值

对计算速度进行选用

  • applyToEither谁快用谁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});

CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});

CompletableFuture<Integer> completableFuture = completableFuture1.applyToEither(completableFuture2, f -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
return f + 1;//输出21
});

System.out.println(Thread.currentThread().getName() + "\t" + completableFuture.get());
}

}

对计算结果进行合并

两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给 thenCombine 来处理,先完成的先等着,等待其它分支任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
return 20;
}), (x, y) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
return x + y;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
return 30;
}), (a, b) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
return a + b;
});
System.out.println("-----主线程结束,END");
System.out.println(thenCombineResult.get());


// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

运行结果:
ForkJoinPool.commonPool-worker-1 —come in 1
ForkJoinPool.commonPool-worker-1 —come in 2
main —come in 3
ForkJoinPool.commonPool-worker-2 —come in 4
main —come in 5
—–主线程结束,END
60

线程池运行选择

以thenRun和thenRunAsync为例,有什么区别?

  1. 没有传入自定义线程池,都用默认线程池ForkJoinPool(参考本章的四大静态方法)

  2. 如果执行第一个任务的时候,传入了一个自定义线程池

    1. 调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池(参考本章的四大静态方法)
    2. 调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池,后面的任务(thenRun也一样)也是使用ForkJoin线程池。
    3. 也有可能处理太快,系统优化切换原则,直接使用main线程处理

2.2 case:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Main {
public static void main(String[] args){
ExecutorService threadPool = Executors.newFixedThreadPool(5);
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
return "abcd";
},threadPool).thenRunAsync(()->{
try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
}).thenRunAsync(()->{
try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
}).thenRunAsync(()->{
try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
});
}
}

运行结果:

1号任务 pool-1-thread-1
2号任务 ForkJoinPool.commonPool-worker-1
3号任务 ForkJoinPool.commonPool-worker-1
4号任务 ForkJoinPool.commonPool-worker-1


2.3 case:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Main {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("1号任务" + "\t" + Thread.currentThread().getName());
return "abcd";
}, threadPool).thenRun(() -> {
System.out.println("2号任务" + "\t" + Thread.currentThread().getName());
}).thenRun(() -> {
System.out.println("3号任务" + "\t" + Thread.currentThread().getName());
}).thenRun(() -> {
System.out.println("4号任务" + "\t" + Thread.currentThread().getName());
});
}
}

运行结果:

1号任务 pool-1-thread-1
2号任务 main
3号任务 main
4号任务 main