不能用 channel 直接实现观察者模式,因为单个 channel 是点对点通信,无法保证多个观察者同时收到同一事件;必须通过中心事件 channel + 分发 goroutine + 每观察者独立带缓冲 channel 才能正确广播。

为什么不用 channel 做观察者模式的事件广播
channel 本身不是为一对多广播设计的,select + range 无法让多个 goroutine 同时收到同一份消息。直接把一个 chan interface{} 给所有观察者共用,会导致消息被某一个接收者取走后其他观察者收不到——这违背观察者模式“通知所有订阅者”的语义。
用 channel 实现观察者必须引入中间调度 goroutine
核心思路是:维护一个注册表(比如 map[*observer]chan interface{}),所有事件先发到一个中心 chan event,再由单独的分发 goroutine 拷贝并广播到每个观察者的专属 channel 中。否则无法保证送达和解耦。
实操建议:
- 观察者注册时应返回一个取消函数,用于从 map 中安全删除对应
chan,避免内存泄漏 - 每个观察者 channel 建议带缓冲(如
make(chan interface{}, 16)),防止分发 goroutine 因某个观察者阻塞而拖垮全局 - 分发逻辑里要加
select+default,跳过已满或已关闭的观察者 channel,避免死锁
type Event struct{ Topic string; Data interface{} }
type Observer struct{ ch chan interface{} }
func NewObserver() *Observer {
return &Observer{ch: make(chan interface{}, 16)}
}
func (o *Observer) Chan() <-chan interface{} { return o.ch }
立即学习“go语言免费学习笔记(深入)”;
type EventBus struct {
mu sync.RWMutex
obs map[*Observer]struct{}
events chan Event
}
func NewEventBus() EventBus {
eb := &EventBus{
obs: make(map[Observer]struct{}),
events: make(chan Event, 128),
}
go eb.dispatch()
return eb
}
func (eb EventBus) Subscribe(o Observer) func() {
eb.mu.Lock()
eb.obs[o] = struct{}{}
eb.mu.Unlock()
return func() {
eb.mu.Lock()
delete(eb.obs, o)
eb.mu.Unlock()
close(o.ch)
}}
func (eb *EventBus) Publish(e Event) {
select {
case eb.events
func (eb *EventBus) dispatch() {
for e := range eb.events {
eb.mu.RLock()
for o := range eb.obs {
select {
case o.ch
注意 channel 关闭与观察者退出的竞态
观察者 goroutine 如果在读取 o.Chan() 时未做 ok 判断,一旦被取消且 channel 关闭,就会 panic。常见错误写法:for e := range o.Chan() —— 这里 o.Chan() 返回的是只读 channel,但底层仍可能被关闭;必须配合 for { select { case e, ok := 。
另一个坑是:如果观察者处理太慢,缓冲区满后 select 的 default 分支会让消息丢失。业务上是否允许丢弃、是否需要背压(如用 semaphore 控制并发消费),得根据场景定。
替代方案:考虑用第三方库或更轻量结构
标准库没有内置观察者抽象,但像 github.com/robfig/pat 或 go.uber.org/zap 的 logger hook 其实都用了类似机制。如果你只是想做日志监听、配置变更通知等,用 sync.Map + chan struct{} 配合 atomic 标记状态,比全量复制 event 到每个 channel 更省资源。
真正复杂的状态广播(如带优先级、过滤、重试)不建议硬啃 channel 调度逻辑——容易写出难以测试的 goroutine 网状依赖。这时候该上消息队列或用 golang.org/x/exp/slices 做切片遍历 + context 控制超时。










