【阅读笔记】rocketmq 特性实现 —— 消息消费

通过源码分析 rocketmq 消息产生与消费如何实现

PUSH 模型消息消费示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

public static void main(String [] argv) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
consumer.setInstanceName(argv[0]);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 首次加入时,从哪里开始消费
consumer.subscribe("TopicTest", "*"); // * 表示订阅所有 tagCode
consumer.setMessageModel(MessageModel.CLUSTERING); // 集群还是广播

// 用 ConsumeConcurrentlyContext 或 ConsumeOrderlyContext
// 分别表示并行消费或顺序消费
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
System.out.println(argv[0] + "收到消息");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

consumer.start();
}

consumer 端请求消息

PUSH 模型其实是通过 PULL 实现,关键是 RebalanceService 和 PullMessageService。

RebalanceService 负责定期产生 PullRequest,表示要从哪里拉取消息

PullMessageService 负责定期处理 PullRequest, 到 broker 拉取消息并消费

生成 PullRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201

public class RebalanceService extends ServiceThread {
private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
private final MQClientInstance mqClientFactory;

public void run() {
while (!this.isStopped()) {
this.waitForRunning(waitInterval); // 20s 产生一次
this.mqClientFactory.doRebalance();
}
}
}

// MQClientInstance 由单例 MQClientManager 管理
public class MQClientInstance {

...

public void doRebalance() {
// DefaultMQPushConsumerImpl 和 DefaultMQPullConsumerImpl 在 start 时会加入到 consumerTable
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance(); // 这个示例中 impl 是 DefaultMQPushConsumerImpl 类型
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
...
}

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
...

public void doRebalance() {
if (!this.pause) {
// rebalanceImpl 类型是 RebalancePushImpl
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}

...
}

// RebalancePushImpl 继承 RebalanceImpl
public abstract class RebalanceImpl {

protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);

...

public void doRebalance(final boolean isOrder) {
// 获取这个 consumer 的订阅信息,一个 consumer 可以订阅多个 topic
// 调用 subscribe 时会加入
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 每个订阅的 topic 都会产生 PullRequest
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}

this.truncateMessageQueueNotMyTopic();
}

// 根据 topic 进行负载均衡
// 这里的负载均衡是将所有 borker 下的所有队列放到一起,然后决定从哪个队列拉消息
// 例如 consumer group 下共有 2 个 consumer,队列有 4 个, 则需要每个 consumer 消费 2 个队列,这样消费就比较平衡
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
// mqSet 就是这个 topic 的所有 消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
// BROADCASTING 模型是所有队列的消息都要消费,所以这里是 mqSet(全部队列)
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info(...);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}

case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

// 找到所有同一个 consumer group 的 consumer id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn(...);
}
}

if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}

if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);

Collections.sort(mqAll);
Collections.sort(cidAll);

// 负载均衡策略,默认是 AllocateMessageQueueAveragely
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

/* 根据负载均衡策略选出一组队列 allocateResult
*/
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
} catch (Throwable e) {
log.error(...);
return;
}

// List 转成 Set,确保唯一
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}

// 只拉取 allocateResultSet 这些队列的消息
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(...);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
// 如果 processQueueTable 中的队列不在 mqSet 中, 就将它从 processQueueTable 删除
...

// 准备产生 PullRequest
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
// 对于不在 processQueueTable 中的队列才需要生成 PullRequest
if (!this.processQueueTable.containsKey(mq)) {
...

// 定位到要消费的位置
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);

/* 生成 PullRequest
*/
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup); // consumerGroup
pullRequest.setNextOffset(nextOffset); // 消费位置
pullRequest.setMessageQueue(mq); // 哪个队列 broker, topic, queueId
pullRequest.setProcessQueue(pq); // ProcessQueue 保存拉回来的消息
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}

/* 对于 PULL 模型,dispatchPullRequest 什么都不做
* 对于 PUSH 模型,dispatchPullRequest 将调用 defaultMQPushConsumerImpl#executePullRequestImmediately 方法
* 将 PullRequest 交给 PullMessageService
*/
dispatchPullRequest(pullRequestList);

return changed;
}

...
}

消息拉取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236

// 根据 pullRequest 从 borker 拉消息
public class PullMessageService extends ServiceThread {

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

...

public void run() {
while (!this.isStopped()) {
try {
// pullRequest 队列,一有请求就尝试到 broker 拉取消息
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (...) {
...
}
}
}

private void pullMessage(final PullRequest pullRequest) {
// 从 consumerTable 取一个 consumer
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn(...);
}
}
}

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

// 保存 offset
// 分两种 OffsetStore, LocalFileOffsetStore 和 RemoteBrokerOffsetStore
// BROADCASTING 模式用 LocalFileOffsetStore, CLUSTERING 模式用 RemoteBrokerOffsetStore
private OffsetStore offsetStore;

// 消息消费服务
// 分两种 ConsumeMessageConcurrentlyService 和 ConsumeMessageOrderlyService
private ConsumeMessageService consumeMessageService;
...

public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();

...

// 这里的代码是 consumer 端流量控制,如果拉回来的消息还没消费完就先不拉消息了,先略过

...

final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}

// 拉消息回来后的 callback,下面贴出代码
PullCallback pullCallback = new PullCallback() { ... }

...

try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(), // 要拉取消息的队列
subExpression, // 消息过滤表达式
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(), // 消息位置
this.defaultMQPushConsumer.getPullBatchSize(), // 一批拉多少消息
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC, // 异步拉取
pullCallback // callback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}

...
}

public class PullAPIWrapper {
...

public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

// 找到 broker 的地址
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), recalculatePullFromWhichNode(mq), false);
}

if (findBrokerResult != null) {
{
// 检查 broker 的版本,太低就抛异常
...
}

// 如果是 slave 就清了 FLAG_COMMIT_OFFSET 标志位,这个标志会使 broker 保存消费位置
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}

PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);

// 根据 filter 修正 broker 地址
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}

// 所有请求都由 MQClientAPIImpl 发
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);

return pullResult;
}

throw new MQClientException(...);
}

...
}

public class MQClientAPIImpl {
...

public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);

switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}

return null;
}

private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
// 将 response 转换成 PullResult
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);

assert pullResult != null;

// 回调
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
// 异常处理 pullCallback.onException
...
}
}
});
}

private PullResult pullMessageSync(
final String addr,
final RemotingCommand request,
final long timeoutMillis
) throws RemotingException, InterruptedException, MQBrokerException {

RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processPullResponse(response);
}


...
}

消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215



public class DefaultMQPushConsumerImpl implements MQConsumerInner {
...

public void pullMessage(final PullRequest pullRequest) {
...

/*消息拉回来后的 callback
*/
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {

if (pullResult != null) {
//
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND: // 拉到消息
...

// 可能被 processPullResult 过滤掉了
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
...

// 保存拉回来的消息
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

/* 消费消息
* consumeMessageService 是 ConsumeMessageConcurrentlyService 类型
*/
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);


// 再拉一批
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
...
break;

case NO_NEW_MSG: // 没新消息
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG: // 有新消息,但不匹配
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
case OFFSET_ILLEGAL: // offset 参数不合法
... // 更新 offset
break;
default:
break;
}
}
}

@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}

...
}
...
}

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

...

public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
...
// 提交到线程池执行消息消费
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
consumeExecutor.submit(consumeRequest);
...
}

...

public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest) {

...

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
... // 打印日志,显示消费失败的消息
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
/* send message back 将消息发回 broker,重新消费
* sendMessageBack 成员就返回 true,这个消息不需要在本地重新消费
* 否则消息加入到 msgBackFailed, 在本地重新消费
*/
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); // 重新消费的次数
msgBackFailed.add(msg);
}
}

if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
// 延时 5s 后提交到 submitConsumeRequest, 重新消费一次
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;

default:
break;
}

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 更新 offset store 中的 offset
// 如果是 BROADCASTING,更新本地,
// 如果是 CLUSTERING,更新 broker
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}


class ConsumeRequest implements Runnable {

public void run() {
...

//通过 registerMessageListener 注册的 listener
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;

// 如果有 messsage hook, 就调用 executeHookBefore
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
// 如果是由 retry 队列来的消息,需要将 topic 恢复为原始 topic
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);

if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}

/* 调用 listener 消费消息
*/
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(...);
hasException = true;
}

...


// 如果有 messsage hook, 就调用 executeHookAfter
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}

...

// 处理消息消费的结果
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
}
}

broker 拉取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252

// 拉取消息命令由 PullMessageProcessor 处理
public class PullMessageProcessor implements NettyRequestProcessor {

@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
return this.processRequest(ctx.channel(), request, true);
}

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {

// 响应
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();

// 请求头
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

// 检查请求是否合法,订阅信息是否存在
// 获取订阅信息,准备做消息过滤
...

/* 从 message store 获取消息
*/
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), // topic
requestHeader.getQueueId(), // 队列 ID
requestHeader.getQueueOffset(), // 读哪个 offset 的消息
requestHeader.getMaxMsgNums(), // 最多读多少消息
messageFilter); // 用于过滤消息

if (getMessageResult != null) {
// 从 getMessageResult 获取 offset 信息设置到 response 和 responseHeader
...

// 确定是否让 consumer 从 slave 拉消息
...

switch (getMessageResult.getStatus()) {
// 根据不同的返回码设定 response 的成员
...
}

// 执行 consumer message hook
if (this.hasConsumeMessageHook()) {
ConsumeMessageContext context = new ConsumeMessageContext();

// 设置 context 的值
...

executeConsumeMessageHookBefore(context);
}

// response.getCode() 在前面的 switch 中已经设置了值
switch (response.getCode()) {
case ResponseCode.SUCCESS:
// 记录统计信息
...

// 顾名思义,从堆复制消息再发送出去(默认配置)
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills = this.brokerController.getMessageStore().now();

final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
(int) (this.brokerController.getMessageStore().now() - beginTimeMills));

/* 设置返回的 body
*/
response.setBody(r);
} else {
try {
// FileRegion 是 netty 中用于零复制的工具
FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);

// 用零复制的方式写消息到 consumer
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
log.error("transfer many message by pagecache exception", e);
getMessageResult.release();
}

response = null; // 将 response 设置为 null, 不会再写 response
}
break;

case ResponseCode.PULL_NOT_FOUND:
// 长轮询
...
break;

case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;

case ResponseCode.PULL_OFFSET_MOVED:
// 表示 offset 有问题
...
break;

default:
assert false;
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
}

boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable &&
this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;

if (storeOffsetEnable) {
// 将 offset 记录到 ConsumerOffsetManager, ConsumerOffsetManager 记录各个 consumer group 对于
// 各个队列的消息情况
this.brokerController.getConsumerOffsetManager().commitOffset(
RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getCommitOffset());
}

return response;
} // End of processRequest
}

public class DefaultMessageStore implements MessageStore {

public GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {

...

GetMessageResult getResult = new GetMessageResult();

ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
// 检查 offset
...

// 先读 consume queue, 得到消息在 commit log 中的位置
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {

try {
...

/* 从 commit log 读一批消息
*/
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // 开始位置
int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // 消息大小
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); // tagCode 的散列值

maxPhyOffsetPulling = offsetPy;

// 检查是否要跳到下一个 commit log 文件
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}

// 检查 offsetPy 这个位置的数据是否在磁盘
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);

// 读够一批,中断循环
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) {
break;
}

// 消息过滤(isMatchedByConsumeQueue)
...

/* 从 commit log 读消息
*/
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);

if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}

// 要跳到下一个 commit log 文件
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}

// 消息过滤(isMatchedByCommitLog)
...

// 统计
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();

// 读到的数据放入结果
getResult.addMessage(selectResult);

status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;

}

if (diskFallRecorded) {
long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
}

nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

// 建议从 slave 读
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);

} finally {
bufferConsumeQueue.release();
}
} else {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
log.warn(...);
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}

// 消息统计
...

getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
} // End of getMessage
}