C++调用paho-mqtt-cpp异步发布需使用mqtt::async_client,连接成功后调用publish()立即返回,须预先设置connection_lost_callback和delivery_complete_callback,发布时传mqtt::message对象且主题为std::string。

如何用 C++ 调用 paho-mqtt-cpp 实现异步发布
直接使用 mqtt::async_client 类,它默认走异步路径;同步发布(publish())会阻塞,而异步必须配合 delivery_complete_callback 和 connection_lost_callback 才能真正“不卡主线程”。
关键点:异步发布调用 publish() 后立即返回,消息实际发送由底层线程池处理;但你必须在连接成功后才调用,否则抛 std::runtime_error(错误信息含 "not connected")。
- 连接前先设置回调:
cli.set_connected_callback()、cli.set_connection_lost_callback() - 发布时必须传
mqtt::message对象,不能只传字符串;主题名需为std::string,不能是 C 风格字符串字面量(如"topic"可以,但"topic\0"会截断) - 若需要确认送达,要实现
delivery_complete_callback并注册,否则无法知道QoS > 0消息是否被 broker 确认
mqtt::async_client cli("tcp://localhost:1883", "cpp_client");
cli.set_connection_lost_callback([](const std::string& cause) {
std::cout << "Connection lost: " << cause << std::endl;
});
cli.connect()->wait(); // wait() 是阻塞的,但 connect() 返回 future,可改为 async + then
auto tok = cli.publish("sensor/temp", "23.5", 1, false);
tok->wait(); // 这里 wait() 是等发布动作提交完成,不是等 broker 确认为什么订阅后收不到消息?检查 callback 注册和事件循环
paho-mqtt-cpp 的异步订阅本身不自动触发回调——它只把消息推到内部队列,你必须显式调用 cli.start_consuming() 启动消费线程,或手动轮询 cli.consume_message()。没这一步,message_arrived_callback 根本不会被调用。
常见错误是:调用了 subscribe(),也注册了 set_message_arrived_callback(),但忘了启动消费机制,结果消息静默丢失。
立即学习“C++免费学习笔记(深入)”;
-
start_consuming()启动后台线程,适合长期运行程序;但它不能中途停,只能靠stop_consuming()(注意:不是所有版本都支持 stop) - 若需精细控制,改用
consume_message(std::chrono::milliseconds)在主循环中非阻塞拉取,超时返回空指针 - callback 函数签名必须严格为
void(const mqtt::message&),捕获异常会导致整个消费线程崩溃(建议加 try/catch)
cli.set_message_arrived_callback([](const mqtt::message& msg) {
try {
std::cout << "Received: " << msg.get_payload_str() << " on " << msg.get_topic() << std::endl;
} catch (...) { /* 防止异常逃逸 */ }
});
cli.subscribe("sensor/#", 1)->wait();
cli.start_consuming(); // 必须有!QoS 1/2 发布失败却没报错?看 delivery_complete_callback 是否被触发
异步发布下,publish() 成功只表示消息已入发送队列,不代表 broker 已接收。QoS 1 或 2 的消息只有收到 PUBACK/PUBREC 才算真正送达,这个状态通过 delivery_complete_callback 通知。
如果你没设这个回调,或者回调里没做日志/计数,就完全不知道消息是否落地——尤其在网络抖动时,broker 可能丢弃未确认消息,而客户端毫无感知。
- 回调参数是
const mqtt::idelivery_token&,调用token.get_message_id()可关联原始消息 - 同一个 token 可能被多次回调(比如重传),所以不要在回调里释放资源,除非确认
token.is_complete() - 如果 broker 主动断连,未确认的消息会进入
offline_buffer(默认大小 1000),超出后新消息直接丢弃,且不报错
cli.set_delivery_complete_callback([](mqtt::idelivery_token_ptr tok) {
if (tok && tok->is_complete()) {
std::cout << "Message delivered, ID=" << tok->get_message_id() << std::endl;
}
});链接断开后自动重连失效?别只依赖 set_automatic_reconnect()
set_automatic_reconnect() 只控制 TCP 层重连行为,它不会自动恢复订阅(subscribe)、也不会重新绑定 callback。断连重连后,你得手动补订主题,否则消息收不到。
更隐蔽的问题是:重连成功触发 connected_callback,但此时 client 内部状态可能还没完全就绪(比如 session 还没重建),立刻 subscribe() 可能失败并抛异常。
- 务必在
connected_callback里再次调用subscribe(),且对返回的itoken调用wait()或监听其完成 - 避免在重连回调里做耗时操作(如文件读写),否则阻塞重连流程
- 测试时用
mosquitto_sub -t 'test' -d手动模拟 broker 断连,比单纯 kill 进程更能暴露状态不同步问题
自动重连不是银弹——它解决不了会话丢失、消息重复、订阅漂移这些 MQTT 协议层问题。











