subscribe()是Flux/Mono生命周期起点,触发onSubscribe/onNext/onComplete或onError;无订阅则无执行,操作符仅组装链路;冷序列用cache()/share()复用,热序列需显式管理生命周期。

订阅机制:理解 Flux/Mono 的生命周期起点
Reacto r 中的 subscribe() 不是简单触发数据流,而是启动整个响应式链的生命周期。调用它时,会依次触发 onSubscribe()(传递 Subscription)、onNext()、onComplete() 或 onError()。关键点在于:没有 subscribe(),就没有实际执行 —— 所有操作符(如 map、filter)只是组装链路,不会触发任何计算或 I/O。
常见误区是多次调用 subscribe() 导致重复执行(比如 HTTP 请求被发多次)。解决方式包括:
- 对冷序列(Cold Publisher,如
Mono.fromCallable()、Flux.range())使用cache()或share()实现多订阅复用 - 对热序列(Hot Publisher,如
Flux.create()配合publish().refCount())显式管理生命周期 - 避免在业务逻辑中隐式重复订阅 —— 推荐将数据流封装为方法返回值,由最终调用方统一订阅
错误处理:分层拦截与语义化恢复
Reacto r 提倡“错误即数据”,不鼓励 try-catch 包裹流。正确做法是在链路上用声明式操作符提前捕获并转化异常:
-
onErrorResume():替换异常为替代数据(如 fallback 值),适合非关键路径降级 -
onErrorReturn():直接返回固定值,适用于兜底场景(如查不到用户时返回默认头像) -
onErrorMap():将原始异常映射为更明确的业务异常(如把HttpClientException转成UserNotFoundException) -
retry()和retryWhen():控制重试逻辑;注意避免无条件重试导致雪崩,建议配合指数退避(Retry.backoff(3, Duration.ofSeconds(1)))
全局错误兜底可用 doOnError() 记录日志,但不要在这里做恢复 —— 恢复逻辑应放在更上游、语义更清晰的位置。
背压:面向下游消费能力的设计约束
背压不是 Reactor 特有机制,而是 Reactive Streams 规范的核心 —— 它要求下游能主动告知上游“我还能处理多少”。Flux 默认支持背压(Mono 忽略,因其最多只发 1 个元素),但必须正确使用才能生效:
- 避免在链中混入不支持背压的操作:如
block()、toStream()、或未指定策略的publishOn()(它默认使用Queues.SMALL_BUFFER_SIZE,可能丢弃请求) - 对高吞吐场景,显式配置缓冲区大小:
publishOn(scheduler, bufferSize)或使用limitRate(n)控制每批请求数 - 数据库/HTTP 客户端需选用支持背压的驱动(如 R2DBC、WebClient),否则上游控速无效
- 自定义
Flux.create()时,务必调用subscription.request(n)响应下游需求,不可自行发数据
组合实践:一个典型 Web API 场景
以 Spring WebFlux 中查询用户为例:
- 用
WebClient.get().retrieve().bodyToMono(User.class)发起请求(天然支持背压) - 加
timeout(Duration.ofSeconds(3))防止无限等待 - 失败时用
onErrorMap(WebClientResponseException::getStatus, this::mapToBusinessException)统一异常语义 - 降级走缓存:
onErrorResume(e -> cacheService.findUser(id).onErrorComplete()) - 最终订阅由框架完成(Controller 返回 Mono),不手动调用 subscribe()
这种写法让错误可预测、资源可控、扩展清晰,也便于后续接入熔断(如 Resilience4j)或指标观测。










