
本文介绍如何利用 eclipse vert.x mutiny 的 `multi` 和 `uni` 组合,对元素列表进行**严格顺序调用异步方法**,并在任一环节失败时自动降级并继续后续处理。
在响应式编程中,「顺序执行多个异步操作」是一个常见但易被误解的需求:很多人误用 flatMap(并行)或 concatMap(虽有序但默认中断失败流),而实际需要的是串行、容错、无中断的异步链式调用。
Mutiny 提供了简洁优雅的解决方案——transformToUniAndConcatenate,它会将每个元素映射为一个 Uni,并按原始迭代顺序依次订阅、等待完成后再处理下一个,天然满足“前一个结束 → 启动下一个”的语义。
以下是完整实现示例:
import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; // 假设 Element 是自定义类型,asyncMethod 返回 Uniprivate Uni asyncMethod(Element element) { // 模拟异步逻辑(如 HTTP 调用、DB 查询) return Uni.createFrom().item("result-for-" + element.getId()) .onItem().delayIt().by(Duration.ofMillis(100)); // 演示延迟 } // 顺序执行 + 失败不中断 public Multi executeSequentiallyWithFallback(List elements) { return Multi.createFrom().iterable(elements) .onItem() .transformToUniAndConcatenate(element -> asyncMethod(element) .onFailure() // 捕获任意 Throwable(含 checked exception) .recoverWithItem("fallback-for-" + element.getId()) // 降级返回固定值 // 或使用 .continueWith(() -> Uni.createFrom().item("...")) 实现更复杂兜底 ); }
✅ 关键点说明:
- transformToUniAndConcatenate 是核心:它确保 Uni 逐个创建、串行等待、结果扁平化为 Multi
; - onFailure().recoverWithItem(...) 实现失败兜底,避免整个流终止;
- 若需记录错误日志,可搭配 .onFailure().invoke(Throwable::printStackTrace);
- 不要使用 transformToUniAndFlatMap(等价于并行)或 transformToUniAndMerge(无序合并)。
⚠️ 注意事项:
- 该方案不支持并发控制(如“最多同时执行 3 个”),如需限流,请改用 transformToUniAndConcatenate + 自定义信号协调,或切换至 Multi#concatMap 配合 runSubscriptionOn() 控制线程;
- recoverWithItem 仅对 Uni 内部异常生效;若 asyncMethod 本身抛出未捕获运行时异常,仍会被捕获并触发 fallback;
- 返回类型为 Multi
,如需最终聚合(如 List ),可追加 .collect().asList()。
通过这一模式,你既能保证业务逻辑的执行时序性,又具备生产级的健壮性——一次失败,全局无忧。










