0

0

SpringBoot如何整合RocketMQ事务、广播以及顺序消息

WBOY

WBOY

发布时间:2023-05-18 10:04:05

|

2133人浏览过

|

来源于亿速云

转载

环境:springboot2.3.9RELEASE + RocketMQ4.8.0

依赖

   org.springframework.boot     spring-boot-starter-web       org.apache.rocketmq     rocketmq-spring-boot-starter     2.2.0 

配置文件

server:   port: 8080 --- rocketmq:   nameServer: localhost:9876   producer:     group: demo-mq

普通消息

发送

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String message) {   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }

接受

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener {      @Override     public void onMessage(String message) {         System.out.println("接收到消息:" + message) ;     }  }

顺序消息

发送

@Resource private RocketMQTemplate rocketMQTemplate ;  public void sendOrder(String topic, String message, String tags, int id) {     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),              "order-" + id, new SendCallback() {                 @Override                 public void onSuccess(SendResult sendResult) {                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ;                 }                 @Override                 public void onException(Throwable e) {                     e.printStackTrace() ;                 }             }); }

这里是根据hashkey将消息发送到不同的队列中

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",      selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener {      @Override     public void onMessage(String message) {         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ;     }  }

consumeMode = ConsumeMode.ORDERLY,指明了消息模式为顺序模式,一个队列,一个线程。

结果

SpringBoot如何整合RocketMQ事务、广播以及顺序消息

当consumeMode = ConsumeMode.CONCURRENTLY执行结果如下:

SpringBoot如何整合RocketMQ事务、广播以及顺序消息

集群/广播消息模式

发送端

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String topic, String message, String tags) {     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }

集群消息模式

消费端

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }

messageModel = MessageModel.CLUSTERING

测试

启动两个服务分别端口是8080,8081

8080服务

SpringBoot如何整合RocketMQ事务、广播以及顺序消息

8081服务

SpringBoot如何整合RocketMQ事务、广播以及顺序消息

集群消息模式下,每个服务分别接收一部分消息,实现了负载均衡

广播消息模式

消费端

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }

messageModel = MessageModel.BROADCASTING

测试

启动两个服务分别端口是8080,8081

8080服务

SpringBoot如何整合RocketMQ事务、广播以及顺序消息

8081服务

SpringBoot如何整合RocketMQ事务、广播以及顺序消息

集群消息模式下,每个服务分别都接受了同样的消息。

事务消息

RocketMQ事务的3个状态

TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息

讯飞听见会议
讯飞听见会议

科大讯飞推出的AI智能会议系统

下载

TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。

TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程 整体流程为:

正常事务发送与提交阶段

1、生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)

2、服务端响应消息写入结果,半消息发送成功

3、开始执行本地事务

4、根据本地事务的执行状态执行Commit或者Rollback操作

事务信息的补偿流程

1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求

2、生产者收到确认回查请求后,检查本地事务的执行状态

3、根据检查后的结果执行Commit或者Rollback操作

补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

发送端

@Resource private RocketMQTemplate rocketMQTemplate ;      public void sendTx(String topic, Long id, String tags) {     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(             new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))).             setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(),              UUID.randomUUID().toString().replaceAll("-", "")) ; }

生产者对应的监听器

@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener {          @Resource     private BusinessService bs ;      @Override     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {         // 这里执行本地的事务操作,比如保存数据。         try {             // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据             String id = (String) msg.getHeaders().get("BID") ;             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ;             System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ;             bs.save(users, new UsersLog(users.getId(), id)) ;         } catch (Exception e) {             e.printStackTrace() ;             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }      @Override     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {         // 这里检查本地事务是否执行成功         String id = (String) msg.getHeaders().get("BID") ;         System.out.println("执行查询ID为:" + id + " 的数据是否存在") ;         UsersLog usersLog = bs.queryUsersLog(id) ;         if (usersLog == null) {             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }  }

消费端

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener {      @Override     public void onMessage(Users users) {         System.out.println("TX接收到消息:" + users) ;     }  }

Service

@Transactional public boolean save(Users users, UsersLog usersLog) {     usersRepository.save(users) ;     usersLogRepository.save(usersLog) ;     if (users.getId() == 1) {         throw new RuntimeException("数据错误") ;     }     return true ; }      public UsersLog queryUsersLog(String bid) {     return usersLogRepository.findByBid(bid) ; }

Controller

@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) {     ps.sendTx("tx-topic", id, "tag10") ;     return "send transaction success" ; }

测试

调用接口后,控制台输出:

SpringBoot如何整合RocketMQ事务、广播以及顺序消息

从打印日志看出来都保存完毕了后 消费端才接受到消息。

SpringBoot如何整合RocketMQ事务、广播以及顺序消息

SpringBoot如何整合RocketMQ事务、广播以及顺序消息

删除数据,再测试ID为1会报错的。

SpringBoot如何整合RocketMQ事务、广播以及顺序消息

数据库中没有数据。。。

是不是也不是很复杂,2个阶段来处理。

相关专题

更多
excel制作动态图表教程
excel制作动态图表教程

本专题整合了excel制作动态图表相关教程,阅读专题下面的文章了解更多详细教程。

20

2025.12.29

freeok看剧入口合集
freeok看剧入口合集

本专题整合了freeok看剧入口网址,阅读下面的文章了解更多网址。

65

2025.12.29

俄罗斯搜索引擎Yandex最新官方入口网址
俄罗斯搜索引擎Yandex最新官方入口网址

Yandex官方入口网址是https://yandex.com;用户可通过网页端直连或移动端浏览器直接访问,无需登录即可使用搜索、图片、新闻、地图等全部基础功能,并支持多语种检索与静态资源精准筛选。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

197

2025.12.29

python中def的用法大全
python中def的用法大全

def关键字用于在Python中定义函数。其基本语法包括函数名、参数列表、文档字符串和返回值。使用def可以定义无参数、单参数、多参数、默认参数和可变参数的函数。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

16

2025.12.29

python改成中文版教程大全
python改成中文版教程大全

Python界面可通过以下方法改为中文版:修改系统语言环境:更改系统语言为“中文(简体)”。使用 IDE 修改:在 PyCharm 等 IDE 中更改语言设置为“中文”。使用 IDLE 修改:在 IDLE 中修改语言为“Chinese”。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

16

2025.12.29

C++的Top K问题怎么解决
C++的Top K问题怎么解决

TopK问题可通过优先队列、partial_sort和nth_element解决:优先队列维护大小为K的堆,适合流式数据;partial_sort对前K个元素排序,适用于需有序结果且K较小的场景;nth_element基于快速选择,平均时间复杂度O(n),效率最高但不保证前K内部有序。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

12

2025.12.29

php8.4实现接口限流的教程
php8.4实现接口限流的教程

PHP8.4本身不内置限流功能,需借助Redis(令牌桶)或Swoole(漏桶)实现;文件锁因I/O瓶颈、无跨机共享、秒级精度等缺陷不适用高并发场景。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

134

2025.12.29

抖音网页版入口在哪(最新版)
抖音网页版入口在哪(最新版)

抖音网页版可通过官网https://www.douyin.com进入,打开浏览器输入网址后,可选择扫码或账号登录,登录后同步移动端数据,未登录仅可浏览部分推荐内容。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

63

2025.12.29

快手直播回放在哪看教程
快手直播回放在哪看教程

快手直播回放需主播开启功能才可观看,主要通过三种路径查看:一是从“我”主页进入“关注”标签再进主播主页的“直播”分类;二是通过“历史记录”中的“直播”标签页找回;三是进入“个人信息查阅与下载”里的“直播回放”选项。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

18

2025.12.29

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Redis6入门到精通超详细教程
Redis6入门到精通超详细教程

共47课时 | 5.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号