ExecutorCompletionService的核心是按完成顺序获取结果,避免普通Future轮询时因首个任务未完成而阻塞;它用阻塞队列缓存结果,支持take()阻塞取或poll()非阻塞取,并建议封装任务上下文以追踪来源与耗时。

Java中用 ExecutorCompletionService 收集异步结果,核心是“任务提交后不等顺序,谁先完成谁先取”,特别适合结果到达时间不确定、且需要尽快处理已完成任务的场景。
为什么不用普通 Future 列表轮询?
直接用 ExecutorService.submit() 得到一堆 Future,再循环调用 get() 会阻塞在第一个没完成的任务上——哪怕后面几个早已跑完。而 CompletionService 把“完成”这件事单独拎出来管理,底层用阻塞队列按完成顺序缓存结果,避免空等。
基本用法:三步走
1. 创建线程池(如 Executors.newFixedThreadPool(4))
2. 包装成 ExecutorCompletionService:
new ExecutorCompletionService(executor)
3. 提交任务(submit(Runnable, T) 或 submit(Callable),然后反复调用 take() 或 poll()
-
take():阻塞直到有结果,适合“必须等结果”的场景 -
poll():立即返回,为空就继续干别的,适合非阻塞调度
典型聚合模式:收集全部结果并带序号/来源标识
单纯取结果容易丢失“哪个任务完成了”,建议提交时用封装类携带上下文:
record TaskResult(String taskId, T data, long startTime) {}
提交时:
立即学习“Java免费学习笔记(深入)”;
completionService.submit(() -> {
var result = doHeavyWork();
return new TaskResult<>("task-1", result, System.nanoTime());
});
取结果时就能清晰知道是谁、耗时多少,方便后续聚合、日志或降级处理。
异常处理不能漏
Future.get() 可能抛出 ExecutionException(包装了原始异常),必须捕获并检查 getCause():
try {
TaskResult r = completionService.take().get();
handleSuccess(r);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
handleError(cause); // 比如记录日志、触发重试
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
基本上就这些。关键不是“怎么写”,而是理解它解决的是“完成顺序不可控”下的结果消费问题——不复杂但容易忽略。










