通过分析代码描述顺序消息的实现
前言
rocketmq 消息消费时,对于同一个队列的消息, 拉取时是 FIFO, 但也只是拉取是顺序的,并不保证消费是顺序的。
所以要想实现消息顺序消费的效果,需要在 producer 确保消息产生时,是发送到同一个队列的(默认是均匀到不同队列)
同时确保 consumer 只拉取一个队列的消息消费
producer 示例代码
1 |
|
产生消息实现
与普通消息的产生差不多,但不需要选择哪个 queue 了
1 |
|
consumer 示例代码
1 |
|
消费消息实现
同集群消息一样,顺序消息也是定期产生 PullRequest,然后再从 broker 拉消息
由于用的是 MessageListenerOrderly, DefaultMQPushConsumerImpl 就会判定是顺序消费
- 产生 PullRequest 时,先用尝试用 LOCK_BATCH_MQ 命令锁定队列(标记位),这样其他同一个 group 下的 consumer 不会从这个队列拉消息
- consumeMessageService 就会用 ConsumeMessageOrderlyService 而不是 ConsumeMessageConcurrentlyService 做消息消费功能
- 消息消费时,给 ProcessQueue 加锁,消费完成后更新 offset,这样前面的消息消费完成前,同一个 consumer 不同拉取取后面的消息,保证到顺序性
1 |
|