【阅读笔记】rocketmq 特性实现 —— 拉取消息长轮询

分析 rocketmq 如何通过长轮询的方式实现消息拉取

前言

rocketmq 消息消费采用 pull 方式实现, consumer 向 broker 发送请求以获取最新的消息。

如果 consumer 发送请求时 broker 并没有新消息, 那么消息拉取的请求可能大半是无用的, 浪费带宽和 CPU。

所以 rocketmq 的实现是使用长轮询的方式,当 borker 处理 PULL 请求时,如果没有新的消息, 它不返回数据给 consumer。 当有新的消息发到 broker 或过了轮询时间后,它才再查一次消息并返回结果给 consumer。

时序图如下

长轮询时序

实现代码

consumer 端实现

consumer 端不需要特别处理

broker 端实现

拉取消息时,如果没消息就暂停处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

// 处理拉消息请求
public class PullMessageProcessor implements NettyRequestProcessor {

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException {

...

// 从 store 中查消息
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getQueueOffset(),
requestHeader.getMaxMsgNums(),
messageFilter);

if (getMessageResult != null) {
...

switch (response.getCode()) {
...

case ResponseCode.PULL_NOT_FOUND:
// 如果是由 consumer 发出的 PULL 请求, brokerAllowSuspend 为 true
// 如果是 broker 发出的请求, brokerAllowSuspend 为 false
// hasSuspendFlag 由请求头控制(FLAG_SUSPEND),默认是 true
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;

// 长轮询没打开的话,用“短”轮询
// suspendTimeoutMillisLong 默认是 15 * 1000, “短”轮询的时间是 1000
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}

String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();

// 生成一个新 PullResult 提交到 PullRequestHoldService
PullRequest pullRequest = new PullRequest(request,
channel,
pollingTimeMills,
this.brokerController.getMessageStore().now(),
offset,
subscriptionData,
messageFilter);

/* 交给 PullRequestHoldService
*/
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null; // 设置为 null , 不返回数据给 consumer
break;
}
}
}
}
}

PullRequestHoldService 过一段时间后尝试拉消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

// 保存 pullRequest ,并在超时后重新发起一个拉消息的请求
public class PullRequestHoldService extends ServiceThread {

...

private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<String, ManyPullRequest>(1024);

...

/**
* 接收 pull request
*/
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}

// ManyPullRequest 只有一个成员, ArrayList<PullRequest> pullRequestList
// 用于保存 PullRequest,用 synchronized 同步读写
mpr.addPullRequest(pullRequest);
}

@Override
public void run() {
log.info("{} service started", this.getServiceName());

while (!this.isStopped()) {
try {
// 等待一段时间
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}

/* 检查有没有超时的请求, 有就产生新的 Pull 请求
*/
long beginLockTimestamp = this.systemClock.now();
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;

if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}

log.info("{} service end", this.getServiceName());
}

...

private void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}

/* DefaultMessageStore 收到新消息后, 会调用这个方法
* PullRequestHoldService 自身也会定期调用
*/
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset,
final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {

String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);

if (mpr != null) {
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {

// 需要重试的请求
List<PullRequest> replayList = new ArrayList<PullRequest>();

for (PullRequest request : requestList) {

// maxOffset 是队列是新的 offset(调用这个方法时)
// 如果请求的 offset 大于 maxOffset,需要将 newestOffset 更新为最新的 offset
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}

// 如果是 DefaultMessageStore 有新消息,这里做消息过滤
if (newestOffset > request.getPullFromThisOffset()) {
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}

if (match) {
try {
/* 向 PullMessageExecutor 提交一个线程, 这个线程执行 PullMessageProcessor#processRequest 拉取消息,
* brokerAllowSuspend 为 false
* 这次不管有没有读取新消息,都会返回 reponse 给 consumer
*/
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}

// 被过滤掉了……
continue;
}
}

// 等待超时了
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}

replayList.add(request);
}

// 要重试
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}
}

另外如果有新的消息, Message Store 会通过到 PullRequestHoldService,然后直接返回新消息给 consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

public class DefaultMessageStore implements MessageStore {

...


// 消息存到 commit log 后调用 doReput
class ReputMessageService extends ServiceThread {

...

private void doReput() {
...

if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() &&
DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {

// 如果长轮询开启了,通知
// messageArrivingListener 是 NotifyMessageArrivingListener 类型
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}

...
}

@Override
public void run() {
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
}
}
}

// 通知到 PullRequestHoldService
public class NotifyMessageArrivingListener implements MessageArrivingListener {

private final PullRequestHoldService pullRequestHoldService;

@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {

this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties);
}
}