ZeroMQ的PUB/SUB与REQ/REP是两种根本不同的通信模式:前者单向广播、无连接、需显式订阅;后者双向配对、强顺序、send/recv必须严格交替,socket类型不可混用或复用。

ZeroMQ 的 C++ 发布订阅(PUB/SUB)和请求响应(REQ/REP)模式,本质是两种完全不同的通信契约:前者是单向广播、无连接、无确认;后者是双向配对、隐式同步、强顺序。选错模式或混用语义,90% 的“收不到消息”“卡死”“乱序”问题都源于此。
zmq::socket_t 类型必须严格匹配模式,且不能复用
ZeroMQ 不允许一个 socket 同时承担多种角色。比如用 ZMQ_REQ socket 去 connect 一个 ZMQ_PUB 端口,程序不会报错,但永远收不到任何数据——因为协议层根本不兼容。
-
ZMQ_REQ只能 connect 到ZMQ_REP(或ZMQ_ROUTER),且每次send后必须紧跟recv,否则下一次send会阻塞 -
ZMQ_PUB只能 bind(不建议 connect),ZMQ_SUB只能 connect(不建议 bind),且ZMQ_SUB必须调用setsockopt(ZMQ_SUBSCRIBE, ...)才能收到消息,空订阅("")在较新版本中默认不生效 - 一个
zmq::context_t可创建多个 socket,但每个 socket 的类型和行为由构造时第二个参数唯一决定,不可运行时变更
发布订阅模式中,zmq_setsockopt(..., ZMQ_SUBSCRIBE, ...) 是必填项
这是新手最常踩的坑:SUB 端代码写了 connect,也写了 recv,但始终收不到 PUB 发出的消息。根本原因就是漏了订阅动作——ZeroMQ 的 SUB socket 默认屏蔽所有消息,哪怕 PUB 已经在发。
- 订阅空字符串:
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);表示接收所有主题(注意长度传 0,不是 1) - 订阅特定前缀:
std::string topic = "stock.AAPL"; subscriber.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size()); - 可多次调用
setsockopt(ZMQ_SUBSCRIBE, ...)实现多主题订阅,但不支持通配符(如"stock.*")——需靠应用层过滤 - 取消订阅用
setsockopt(ZMQ_UNSUBSCRIBE, ...),但实际中极少需要
请求响应模式里,send 和 recv 必须严格交替,且不能跳过响应
ZMQ_REQ socket 内部维护一个“请求-响应”状态机。一旦发出请求未收到回复,socket 就会进入不可用状态,后续 send 直接返回 false 或抛异常(取决于 cppzmq 版本)。
立即学习“C++免费学习笔记(深入)”;
- 服务端
ZMQ_REP也必须先recv再send,否则客户端会一直等待 - 不要试图在一个 REQ socket 上并发发多个请求——它不支持异步 I/O;如需并发,请用多个 socket 或改用
ZMQ_DEALER+ZMQ_ROUTER - 超时控制推荐用
setsockopt(ZMQ_RCVTIMEO, &timeout_ms, sizeof(timeout_ms)),避免无限阻塞;但注意:ZMQ_SNDTIMEO对 REQ socket 无效
#include#include #include // 客户端:REQ 模式(务必配对 recv) int main() { zmq::context_t ctx; zmq::socket_t sock(ctx, ZMQ_REQ); sock.connect("tcp://localhost:5555"); // 设置接收超时,防止永久卡住 int timeout_ms = 3000; sock.setsockopt(ZMQ_RCVTIMEO, &timeout_ms, sizeof(timeout_ms)); zmq::message_t req(5); memcpy(req.data(), "HELLO", 5); sock.send(req); zmq::message_t reply; if (sock.recv(reply) == 0) { // 返回 0 表示成功 std::cout << "Got: " << std::string(static_cast (reply.data()), reply.size()) << "\n"; } else { std::cerr << "Timeout or error\n"; } }
PUB/SUB 的连接时机与消息丢失风险必须手动管理
PUB 端 bind 后立刻开始发消息,但 SUB 端 connect 是异步建立的,中间存在“连接窗口期”。若 PUB 在 SUB 还没连上来就已发消息,这些消息将永久丢失——ZeroMQ 默认不缓存(不像 Kafka)。这不是 bug,是设计选择。
- 解决方案一(简单):PUB 启动后 sleep 几百毫秒,再开始发;或让 SUB 先启动、等几秒再启 PUB
- 解决方案二(健壮):引入一个“握手” REQ/REP socket,SUB 连上后发个注册请求,PUB 收到才开始广播
- 解决方案三(高级):用
ZMQ_XPUB+ZMQ_XSUB设备做代理,并启用ZMQ_XPUB_VERBOSE监听订阅事件,实现动态流控 - 注意:
ZMQ_CONFLATE可开启“只保留最新消息”,但仅对单个订阅者有效,且不解决初始连接丢失问题
真正难的不是写通代码,而是理解 ZeroMQ 每种 socket 类型背后的状态机和契约约束。REQ/REP 要求严格配对,PUB/SUB 要求显式订阅+容忍丢消息——把它们当普通 TCP socket 用,一定会掉进坑里。











