【阅读笔记】rocketmq 特性实现 —— 顺序消息

通过分析代码描述顺序消息的实现

前言

rocketmq 消息消费时,对于同一个队列的消息, 拉取时是 FIFO, 但也只是拉取是顺序的,并不保证消费是顺序的。

所以要想实现消息顺序消费的效果,需要在 producer 确保消息产生时,是发送到同一个队列的(默认是均匀到不同队列)

同时确保 consumer 只拉取一个队列的消息消费

producer 示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

MessageQueueSelector selector = new OrderlyMessageQueueSelector();

Message msg1 = new Message(
"OrderlyTest",// topic
("Orderly Message 1").getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);

/* 用 selector 或直接指定 MessageQueue 的版本发送消息
*/
producer.send(msg1, selector, 1);

Message msg2 = new Message(
"OrderlyTest",// topic
("Orderly Message 2").getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);
producer.send(msg2, selector, 1);

producer.shutdown();
}

static class OrderlyMessageQueueSelector implements MessageQueueSelector {
// mqs 表示这个 topic 下所有队列
// arg 是调用 send 时传入的参数
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Long) arg;
int index = id % mqs.size(); // 确保同一个 arg 下的消费发到同一个队列
return mqs.get(index);
}
}

产生消息实现

与普通消息的产生差不多,但不需要选择哪个 queue 了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

public class DefaultMQProducer extends ClientConfig implements MQProducer {

// 有异步,one way 的版本,方式大同小异
// 此外还可以直接指定 MessageQueue 而不用 selector
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

return this.defaultMQProducerImpl.send(msg, selector, arg);
}
}

public class DefaultMQProducerImpl implements MQProducerInner {

public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
}

public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}

private SendResult sendSelectImpl(Message msg, MessageQueueSelector selector,
Object arg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);

// 查出 topic 有多少队列
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
/* 调用 selector 选择一个 MessageQueue
*/
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}

long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if (mq != null) {
/* 发送消息,这里同发送普通消息没区别了
*/
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}

throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
}

consumer 示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("OrderlyTest", "*");

/* 这里用 MessageListenerOrderly
*/
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msgExt : msgs) {
try {
String str = new String(msgExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("收到消息" + str);
} catch (UnsupportedEncodingException e) {
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();
}

消费消息实现

同集群消息一样,顺序消息也是定期产生 PullRequest,然后再从 broker 拉消息

由于用的是 MessageListenerOrderly, DefaultMQPushConsumerImpl 就会判定是顺序消费

  1. 产生 PullRequest 时,先用尝试用 LOCK_BATCH_MQ 命令锁定队列(标记位),这样其他同一个 group 下的 consumer 不会从这个队列拉消息
  2. consumeMessageService 就会用 ConsumeMessageOrderlyService 而不是 ConsumeMessageConcurrentlyService 做消息消费功能
  3. 消息消费时,给 ProcessQueue 加锁,消费完成后更新 offset,这样前面的消息消费完成前,同一个 consumer 不同拉取取后面的消息,保证到顺序性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

// 保存 offset
// 分两种 OffsetStore, LocalFileOffsetStore 和 RemoteBrokerOffsetStore
// BROADCASTING 模式用 LocalFileOffsetStore, CLUSTERING 模式用 RemoteBrokerOffsetStore
private OffsetStore offsetStore;

// 消息消费服务
// 分两种 ConsumeMessageConcurrentlyService 和 ConsumeMessageOrderlyService
private ConsumeMessageService consumeMessageService;
...

public void doRebalance() {
if (!this.pause) {
// rebalanceImpl 类型是 RebalancePushImpl
// isConsumeOrderly 返回 true
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
}

public abstract class RebalanceImpl {

protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);

...

public void doRebalance(final boolean isOrder) {
// 获取这个 consumer 的订阅信息,一个 consumer 可以订阅多个 topic
// 调用 subscribe 时会加入
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}

this.truncateMessageQueueNotMyTopic();
}

// 根据 topic 进行负载均衡
// 这里的负载均衡是将所有 borker 下的所有队列放到一起,然后决定从哪个队列拉消息
// 例如 consumer group 下共有 2 个 consumer,队列有 4 个, 则需要每个 consumer 消费 2 个队列,这样消费就比较平衡
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
// mqSet 就是这个 topic 的所有消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
// BROADCASTING 模型是所有队列的消息都要消费,所以这里是 mqSet(全部队列)
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info(...);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}

case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

// 找到所有同一个 consumer group 的 consumer id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn(...);
}
}

if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}

if (mqSet != null && cidAll != null) {
...

// 负载均衡的做法同集群消息一样,产生 Set<MessageQueue> allocateResultSet

...

// 只拉取 allocateResultSet 这些队列的消息
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(...);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
// 如果 processQueueTable 中的队列不在 mqSet 中, 就将它从 processQueueTable 删除
...

// 准备产生 PullRequest
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
// 对于不在 processQueueTable 中的队列才需要生成 PullRequest
if (!this.processQueueTable.containsKey(mq)) {
/* 如果是顺序消费,锁定 mq
* 这个锁的定义与 java 里的锁不一样
* 通过向 broker 发送 LOCK_BATCH_MQ 命令表明队列被队列被这个 client 占用
* 其他 client 试图锁定队列时就会发现队列已经被占用
*/
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}

...

// 产生 pullRequestList

...
}

/* 对于 PULL 模型,dispatchPullRequest 什么都不做
* 对于 PUSH 模型,dispatchPullRequest 将调用 defaultMQPushConsumerImpl#executePullRequestImmediately 方法
* 将 PullRequest 交给 PullMessageService
*/
dispatchPullRequest(pullRequestList);

return changed;
}

...
}

同集群消息一样, PullRequest 由 PullMessageService 执行消息的拉取,拉取的消息交给 ConsumeMessageOrderlyService 执行消费
但 ConsumeMessageOrderlyService 在消费时,会加锁,确保消息的顺序性

public class ConsumeMessageOrderlyService implements ConsumeMessageService {

// 由 PullCallback 调用
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {

if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}

...

class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;

...

@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn(...);
return;
}

/* messageQueueLock 内部保存了一个 ConcurrentHashMap
* 每次调用 fetchLockObject 时,就看 messageQueue 这个 key 是不是已经存在,如果没有就新建一个 Object 用于做锁
*/
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// 如果是广播
// 或 processQueue 被上锁且没过期(这里的上锁只是一个标记位,当 LOCK_BATCH_MQ 成功后被标记上)
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {

final long beginTime = System.currentTimeMillis();

for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {

log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {

log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}

long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}

final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

/* 在 takeMessags 里会记录取出的 message 和它们的 offset
*/
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;

// message hook
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
/* 消息消费和拉取时都加锁
*/
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}

/* 调用客户代码消费消息
*/
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(...);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}

if (null == status || ConsumeOrderlyStatus.ROLLBACK == status || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("...");
}

long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}

if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}

if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

// message hook
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}

ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(
ConsumeMessageOrderlyService.this.consumerGroup,
messageQueue.getTopic(),
consumeRT
);

/* 处理消息结果
* 通过 ProcessQueue 获取被消费的消息的 offset, 如果更新到 OffsetStore
* continueConsume 的值决定循环要不要继续
*/
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}

} // End of for

} else {
...

// 过段时间再尝试
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
} // End of synchronized
}
}
}