酒馆冒险
49.49MB · 2025-11-07
CompletableFuture原理与实践详解
在现代软件开发中,随着系统复杂度和并发需求的增加,异步编程变得尤为重要。Java 8引入的CompletableFuture类,为开发者提供了强大的工具,以简化异步任务的管理和组合。本文将探讨CompletableFuture的使用方法。
在Java 8之前,异步编程主要依赖于Future接口。然而,Future存在以下局限性:
get()方法时,线程会被阻塞,直到结果可用。这会导致线程资源的浪费,影响系统的并发性能。Future提供的支持不够完善。为了解决这些问题,Java 8引入了CompletableFuture,它实现了CompletionStage和Future接口,提供了更丰富的API,支持异步回调和任务组合。CompletableFuture具有以下优势:
thenApply、thenCombine、thenCompose,allOf、anyOf等。exceptionally、handle等方法,方便地处理异步任务中的异常情况。CompletableFuture提供了两种方法来创建异步任务:
// 使用默认线程池(ForkJoinPool.commonPool())
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 执行异步操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "异步任务结果";
});
// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "使用自定义线程池的结果";
}, executor);
// 使用默认线程池
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 执行异步操作,不返回结果
System.out.println("执行异步任务");
});
// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("使用自定义线程池执行");
}, executor);
注意:在实际生产环境中,建议使用自定义线程池,因为默认的ForkJoinPool.commonPool()在高并发场景下可能无法满足需求。
CompletableFuture提供了丰富的回调方法,以便在任务完成后执行特定操作:
thenApply方法接收一个函数,将前一个阶段的结果转换为新的结果。它有两个版本:
thenApply:在完成当前任务的同一个线程中执行thenApplyAsync:在新的线程中执行(使用默认线程池或自定义线程池)CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
// 同步版本
CompletableFuture<String> result1 = future.thenApply(s -> {
return s + " World";
});
// 异步版本(使用默认线程池)
CompletableFuture<String> result2 = future.thenApplyAsync(s -> {
return s + " World";
});
// 异步版本(使用自定义线程池)
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> result3 = future.thenApplyAsync(s -> {
return s + " World";
}, executor);
thenAccept方法接收一个消费者函数,用于消费前一个阶段的结果,但不返回新值。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello World";
});
// 同步版本
future.thenAccept(result -> {
System.out.println("消费结果: " + result);
});
// 异步版本
future.thenAcceptAsync(result -> {
System.out.println("消费结果: " + result);
});
// 异步版本(使用自定义线程池)
ExecutorService executor = Executors.newFixedThreadPool(10);
future.thenAcceptAsync(result ->{
System.out.println("消费结果:" + result);
},executor);
thenRun方法接收一个Runnable,在前一个阶段完成后执行,但不关心前一个阶段的结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello World";
});
// 同步版本
future.thenRun(() -> {
System.out.println("任务完成后执行清理操作");
});
// 异步版本
ExecutorService executor = Executors.newFixedThreadPool(10);
future.thenRunAsync(() -> {
System.out.println("任务完成后执行清理操作");
},executor);
thenCompose方法用于将两个异步操作串联起来,前一个操作的结果作为后一个操作的输入。这类似于流式处理。(与下面的thenCombine区别在于:这个方法主要用于异步任务的串行编排。简单来说,thenCompose() 用于将一个异步任务的结果作为输入,继续发起一个新的异步任务,形成一条异步流水线。thenCompose() 把两个 Future 合并成一个连续的 Future,而不是thenCombine()形成的嵌套 CompletableFuture<CompletableFuture>)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
CompletableFuture<String> future2 = future1.thenCompose(result -> {
return CompletableFuture.supplyAsync(() -> {
return result + " World";
});
});
// 结果:future2 的值为 "Hello World"
thenCombine方法用于合并两个并行的异步任务的结果。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "World";
});
CompletableFuture<String> combined = future1.thenCombine(future2, (result1, result2) -> {
return result1 + " " + result2;
});
// 结果:combined 的值为 "Hello World"
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
return 1;
});
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf2 do something....");
return 2;
});
CompletableFuture<Void> cf3 = cf1.thenAcceptBoth(cf2, (a, b) -> {
System.out.println(Thread.currentThread() + " cf3 do something....");
System.out.println(a + b);
});
runAfterBoth没有入参,也没有返回值。两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
return 1;
});
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf2 do something....");
return 2;
});
CompletableFuture<Void> cf3 = cf1.runAfterBoth(cf2, () -> {
System.out.println(Thread.currentThread() + " cf3 do something....");
});
allOf方法创建一个新的CompletableFuture,当所有给定的CompletableFuture都完成时,它才会完成。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "结果1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "结果2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
return "结果3";
});
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
// 等待所有任务完成
allFutures.thenRun(() -> {
System.out.println("所有任务已完成");
// 获取各个任务的结果
String result1 = future1.join();
String result2 = future2.join();
String result3 = future3.join();
System.out.println("结果1: " + result1);
System.out.println("结果2: " + result2);
System.out.println("结果3: " + result3);
});
anyOf方法创建一个新的CompletableFuture,当任意一个给定的CompletableFuture完成时,它就会完成。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "结果1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "结果2";
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
anyFuture.thenAccept(result -> {
System.out.println("最快完成的任务结果: " + result);
// 输出:"最快完成的任务结果: 结果2"
});
CompletableFuture提供了完善的异常处理机制:
exceptionally方法用于处理异常情况,当任务抛出异常时,会执行提供的异常处理函数,并返回一个默认值。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟可能抛出异常的操作
if (Math.random() > 0.5) {
throw new RuntimeException("发生异常");
}
return "成功";
});
CompletableFuture<String> result = future.exceptionally(ex -> {
System.out.println("捕获异常: " + ex.getMessage());
return "默认值";
});
// 无论是否发生异常,都能获取到结果
String value = result.join();
handle方法可以同时处理正常结果和异常情况,无论任务成功还是失败,都会执行提供的处理函数。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("发生异常");
}
return "成功";
});
CompletableFuture<String> result = future.handle((value, ex) -> {
if (ex != null) {
System.out.println("处理异常: " + ex.getMessage());
return "异常时的默认值";
}
return "处理后的值: " + value;
});
whenComplete方法类似于handle,但它只用于观察结果和异常,不能修改返回的值。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("发生异常");
}
return "成功";
});
CompletableFuture<String> result = future.whenComplete((value, ex) -> {
if (ex != null) {
System.out.println("观察到异常: " + ex.getMessage());
} else {
System.out.println("观察结果: " + value);
}
});
抛出经检查的异常(Checked Exceptions) :
包括 InterruptedException 和 ExecutionException。这意味着你需要在方法签名中声明这些异常或在调用处捕获它们。
支持超时参数:允许你指定等待结果的最大时间,这对于避免无限期等待某些操作完成特别有用。
try {
String result = future.get(1, TimeUnit.SECONDS); // 等待最多1秒
} catch (InterruptedException e) {
// 当前线程被中断时执行
} catch (ExecutionException e) {
// 当Future完成时发生异常
} catch (TimeoutException e) {
// 超过指定时间未完成任务
}
getNow方法用于立即获取结果,如果任务尚未完成,则返回提供的默认值。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "结果";
});
// 立即尝试获取结果,如果未完成则返回默认值
String result = future.getNow("默认值");
join方法类似于get(),但不会抛出检查异常(ExecutionException、InterruptedException),只会抛出运行时异常。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "结果";
});
// 等待结果,不抛出检查异常
String result = future.join();
complete方法用于手动完成CompletableFuture,如果任务尚未完成,则将其设置为已完成状态,并返回结果。
CompletableFuture<String> future = new CompletableFuture<>();
// 手动完成Future
future.complete("手动完成的结果");
String result = future.join(); // "手动完成的结果"
completeExceptionally方法用于手动完成CompletableFuture,但设置为异常状态。
CompletableFuture<String> future = new CompletableFuture<>();
// 手动设置为异常状态
future.completeExceptionally(new RuntimeException("手动设置异常"));
try {
future.join();
} catch (Exception e) {
System.out.println("捕获异常: " + e.getMessage());
}
CompletableFuture的内部实现依赖于以下几个核心组件:
CompletableFuture内部维护一个volatile Object result字段,用于存储任务的结果。这个字段可以是:
AltResult中)CompletableFuture本身(用于依赖关系)CompletableFuture维护一个volatile Completion stack字段,这是一个栈结构的链表,用于存储所有依赖于当前Future完成的后续操作(Completion)。
// 简化的内部结构
static abstract class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // 链表中的下一个节点
abstract CompletableFuture<?> tryFire(int mode);
abstract boolean isLive();
}
当调用thenApply、thenAccept等方法时,CompletableFuture会创建一个新的Completion节点,并将其添加到当前Future的stack链表中:
// 简化的thenApply实现逻辑
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
CompletableFuture<U> dep = new CompletableFuture<U>();
Completion c = new UniApply<T,U>(this, dep, fn);
// 将c压入stack栈
push(c);
// 尝试触发执行
c.tryFire(SYNC);
return dep;
}
CompletableFuture通过内部的Completion机制实现非阻塞的结果获取:
complete或completeExceptionally方法。stack中的所有Completion节点。Completion节点调用tryFire方法,尝试执行后续的回调操作。CompletableFuture使用ForkJoinPool.commonPool()作为默认的线程池。Executor,以更好地控制线程资源。CompletableFuture的异步执行主要通过以下方式实现:
supplyAsync、runAsync等方法时,会将任务提交到线程池执行。thenApplyAsync等方法时,回调函数会在新的线程中执行。ForkJoinPool使用工作窃取算法,可以更高效地利用多核CPU。美团技术团队在外卖商家端API服务的异步化改造中,成功应用了CompletableFuture,显著提升了系统性能。
改造前,系统采用同步调用方式,从多个下游服务获取数据:
// 改造前的同步调用方式
public ApiResponse getShopInfo(Long shopId) {
// 同步调用多个下游服务
ShopBasicInfo basicInfo = shopService.getBasicInfo(shopId); // 耗时 50ms
ShopStatistics statistics = shopService.getStatistics(shopId); // 耗时 100ms
ShopActivity activity = activityService.getActivity(shopId); // 耗时 80ms
// 组装返回结果
return ApiResponse.success()
.setBasicInfo(basicInfo)
.setStatistics(statistics)
.setActivity(activity);
}
存在的问题:
改造后,使用CompletableFuture实现并行调用:
// 改造后的异步调用方式
public ApiResponse getShopInfo(Long shopId) {
// 并行调用多个下游服务
CompletableFuture<ShopBasicInfo> basicInfoFuture =
CompletableFuture.supplyAsync(() ->
shopService.getBasicInfo(shopId), executor);
CompletableFuture<ShopStatistics> statisticsFuture =
CompletableFuture.supplyAsync(() ->
shopService.getStatistics(shopId), executor);
CompletableFuture<ShopActivity> activityFuture =
CompletableFuture.supplyAsync(() ->
activityService.getActivity(shopId), executor);
// 等待所有任务完成并组装结果
CompletableFuture<ApiResponse> responseFuture =
CompletableFuture.allOf(basicInfoFuture, statisticsFuture, activityFuture)
.thenApply(v -> {
try {
ShopBasicInfo basicInfo = basicInfoFuture.get();
ShopStatistics statistics = statisticsFuture.get();
ShopActivity activity = activityFuture.get();
return ApiResponse.success()
.setBasicInfo(basicInfo)
.setStatistics(statistics)
.setActivity(activity);
} catch (Exception e) {
throw new RuntimeException("获取数据失败", e);
}
})
.exceptionally(ex -> {
// 异常处理
log.error("获取店铺信息失败", ex);
return ApiResponse.error("获取店铺信息失败");
});
return responseFuture.join();
}
改造效果:
在实际场景中,还可以实现部分失败的容错机制:
public ApiResponse getShopInfo(Long shopId) {
CompletableFuture<ShopBasicInfo> basicInfoFuture =
CompletableFuture.supplyAsync(() ->
shopService.getBasicInfo(shopId), executor)
.exceptionally(ex -> {
log.warn("获取基础信息失败,使用默认值", ex);
return ShopBasicInfo.defaultInfo();
});
CompletableFuture<ShopStatistics> statisticsFuture =
CompletableFuture.supplyAsync(() ->
shopService.getStatistics(shopId), executor)
.exceptionally(ex -> {
log.warn("获取统计信息失败,使用默认值", ex);
return ShopStatistics.empty();
});
CompletableFuture<ShopActivity> activityFuture =
CompletableFuture.supplyAsync(() ->
activityService.getActivity(shopId), executor)
.exceptionally(ex -> {
log.warn("获取活动信息失败,使用默认值", ex);
return ShopActivity.empty();
});
return CompletableFuture.allOf(basicInfoFuture, statisticsFuture, activityFuture)
.thenApply(v -> {
try {
return ApiResponse.success()
.setBasicInfo(basicInfoFuture.get())
.setStatistics(statisticsFuture.get())
.setActivity(activityFuture.get());
} catch (Exception e) {
throw new RuntimeException("组装数据失败", e);
}
})
.join();
}
在RPC框架中,CompletableFuture可以用于优化客户端接收服务端返回结果的处理方式。
// 客户端发送请求
public Object sendRequest(RpcRequest request) {
// 将请求ID和响应结果绑定到Channel的AttributeMap
channel.attr(AttributeKey.valueOf(request.getRequestId()))
.set(new AtomicReference<RpcResponse>());
channel.writeAndFlush(request);
// 阻塞等待响应
channel.closeFuture().sync();
// 从AttributeMap获取结果
AtomicReference<RpcResponse> responseRef = channel.attr(
AttributeKey.valueOf(request.getRequestId())).get();
return responseRef.get().getData();
}
问题:需要手动阻塞等待,代码不清晰。
// 客户端发送请求
public CompletableFuture<RpcResponse> sendRequest(RpcRequest request) {
CompletableFuture<RpcResponse> future = new CompletableFuture<>();
// 将future存储到未处理请求的Map中
unprocessedRequests.put(request.getRequestId(), future);
// 发送请求
channel.writeAndFlush(request);
return future;
}
// 客户端处理器接收响应
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof RpcResponse) {
RpcResponse response = (RpcResponse) msg;
CompletableFuture<RpcResponse> future =
unprocessedRequests.remove(response.getRequestId());
if (future != null) {
// 完成Future,触发回调
future.complete(response);
}
}
}
// 使用方式
CompletableFuture<RpcResponse> future = rpcClient.sendRequest(request);
RpcResponse response = future.get(); // 或者使用future.thenAccept()进行异步处理
优势:
在高并发场景下,默认的ForkJoinPool.commonPool()可能无法满足需求,建议使用自定义线程池:
// 创建自定义线程池
ExecutorService executor = new ThreadPoolExecutor(
10, // 核心线程数
50, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(1000), // 任务队列
new ThreadFactoryBuilder() // 线程工厂
.setNameFormat("async-task-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 使用自定义线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "结果";
}, executor);
线程池大小的设置需要考虑:
// 计算合适的线程数
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maxPoolSize = corePoolSize * 2;
ExecutorService executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000)
);
异步任务中的异常可能被吞噬,必须进行适当的处理:
// 错误示例:可能忽略异常
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 可能抛出异常
return riskyOperation();
});
future.thenAccept(result -> {
// 如果上面抛出异常,这里不会执行
System.out.println(result);
});
// 正确示例:处理异常
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return riskyOperation();
})
.exceptionally(ex -> {
log.error("操作失败", ex);
return "默认值";
})
.thenAccept(result -> {
System.out.println(result);
});
对于复杂的异常处理场景,可以使用handle方法:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return riskyOperation();
})
.handle((result, ex) -> {
if (ex != null) {
// 处理异常
log.error("操作失败", ex);
// 根据异常类型返回不同的默认值
if (ex instanceof TimeoutException) {
return "超时默认值";
} else {
return "通用默认值";
}
}
// 处理正常结果
return processResult(result);
});
在CompletableFuture的异步任务中,应该避免长时间阻塞的操作:
// 错误示例:在异步任务中阻塞
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 阻塞调用,占用线程池线程
return blockingDatabaseCall();
}, executor);
// 正确示例:将阻塞操作放在专门的线程池
ExecutorService blockingExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("blocking-task-%d")
.build()
);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return blockingDatabaseCall();
}, blockingExecutor);
如果需要使用get()方法,应该设置超时时间:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return longRunningOperation();
});
try {
// 设置超时时间,避免无限等待
String result = future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// 处理超时
future.cancel(true);
log.error("操作超时", e);
} catch (Exception e) {
log.error("操作失败", e);
}
过长的任务链会增加调试和维护的复杂度,应该合理拆分:
// 错误示例:任务链过长
CompletableFuture<String> result = future1
.thenApply(v1 -> process1(v1))
.thenApply(v2 -> process2(v2))
.thenApply(v3 -> process3(v3))
.thenApply(v4 -> process4(v4))
.thenApply(v5 -> process5(v5))
.thenApply(v6 -> process6(v6));
// 正确示例:合理拆分
CompletableFuture<String> stage1 = future1
.thenApply(v1 -> process1(v1))
.thenApply(v2 -> process2(v2))
.thenApply(v3 -> process3(v3));
CompletableFuture<String> stage2 = stage1
.thenApply(v4 -> process4(v4))
.thenApply(v5 -> process5(v5))
.thenApply(v6 -> process6(v6));
根据业务需求,选择合适的同步或异步方法:
// 如果后续操作是轻量级的,可以使用同步方法
CompletableFuture<String> result = future
.thenApply(v -> v.toUpperCase()) // 轻量级操作,使用同步方法
// 如果后续操作是重量级的,应该使用异步方法
CompletableFuture<String> result2 = future
.thenApplyAsync(v -> heavyOperation(v), executor); // 重量级操作,使用异步方法
如果任务不再需要,应该及时取消,释放资源:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return longRunningOperation();
});
// 如果任务不再需要
if (someCondition) {
future.cancel(true); // 取消任务
}
在应用关闭时,应该关闭自定义的线程池:
@PreDestroy
public void destroy() {
if (executor != null && !executor.isShutdown()) {
executor.shutdown();
try {
// 等待任务完成,最多等待30秒
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
// 强制关闭
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
CompletableFuture作为Java 8引入的异步编程工具,为开发者提供了强大的功能,以简化异步任务的管理和组合。通过合理地使用CompletableFuture,可以:
在实际应用中,需要注意:
通过掌握CompletableFuture的原理和使用方法,结合实际业务场景进行合理应用,可以构建高效、稳定的异步系统。