分析 rocketmq 消息事务如何实现
前言
消息事务将消息的发送拆分为两个阶段。
第一阶段是 prepare, producer 发送 prepare 消息到 broker,然后执行本地事务,
第二阶段是 confirm, producer 执行完本地事务后向 broker 发送 confirm,消息生效。
通过这种方式可以解决两个系统的分布式事务问题:A(存在DB操作)、B(存在DB操作)两方需要保证分布式事务一致性,通过引入中间层 MQ,A 和 MQ 保持事务一致性(异常情况下通过 MQ 反查 A 接口实现 check ),B 和 MQ 保证事务一致(通过重试),从而达到最终事务一致性。
原理
- producer 指定一个 TransactionListener 的实现类
- producer 发送消息(这是 prepare 消息)成功后,会执行 TransactionListener 的 executeLocalTransaction 方法执行本地事务
- executeLocalTransaction 执行成功后,返回 LocalTransactionState.COMMIT_MESSAGE 标志,然后发送 confirm 到 broker
- prepare 消息发送成功后, broker 会等待 confirm 消息,如果一定时间过去后还没收到,则到 producer 查询,这时会调用 TransactionListener 的 checkLocalTransaction 方法检查本地事务是否完成
prepare 消息其实是保存到了一个特殊的 topic,当消息提交后,再发回原来的 topic。
示例
1 |
|
实现
producer 端实现
1 |
|
broker 端实现
处理 prepare 消息, 将消息保存到一个特殊的队列
1 |
|
处理 END_TRANSACTION 命令 (confirm)
1 |
|
本地事务检查
1 |
|