【阅读笔记】rocketmq 特性实现 —— 延时消息

分析 rocketmq 如何实现消息定时投递

前言

rocketmq 支持消息定时投递,即 producer 产生消息后, consumer 并不能马上能拉取到,而是经过一段时间后才会被 consumer 拉取。

消息发到 broker 时, borker 先将消息保存到 commit log , 但不会马上写到目标 topic, 它会先写到 SCHEDULE_TOPIC_XXXX 这个特殊的 topic

ScheduleMessageService 定期读 SCHEDULE_TOPIC_XXXX 中的消息,如果已经到期, 就将消息写到原来的 topic, 这样就实现了延时投递的效果。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

public static void main(String[] args) throws Exception {

DefaultMQProducer producer = new DefaultMQProducer("producer1");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

String str = "This is a deliver delayed message";
Message msg = new Message("TopicTest", "TagA", str.getBytes(RemotingHelper.DEFAULT_CHARSET));

// 设置延时级别
// 不支持延长任意时长,合法要级别由 MessageStoreConfig 成员 messageDelayLevel 配置
// 默认有 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
msg.setDelayTimeLevel(2);

SendResult sendResult = producer.send(msg);
producer.shutdown();

}

实现代码

写 commit log 时,改变 topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

// CommitLog 写消息时,将 topic 改为 SCHEDULE_TOPIC_XXXX
public class CommitLog {

...

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
...
// 消息有延时级别这个属性
if (msg.getDelayTimeLevel() > 0) {

// 合法性检查
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}

// 写到这个 TOPIC
topic = ScheduleMessageService.SCHEDULE_TOPIC; // SCHEDULE_TOPIC_XXXX

// queueId, 每个级别都有一个 queue
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// 保存实际的 topic 和 queue id
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}
...
}

...
}

定时查询到期消息,重新投递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175

//
// 设置定时器,定时检查 SCHEDULE_TOPIC_XXXX 中的消息是否到期
// slave 不启动 ScheduleMessageService
public class ScheduleMessageService extends ConfigManager {

public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
...

// 保存每个级别延时多少时间
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);

// 保存每个级别最后调度的消息在 Consume Queue 的位置
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>(32);

...

private Timer timer;

...

public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);

// 循环处理每个延时级别
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}

// 每个级别都调度定时器
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}

// 定时将信息写到磁盘
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) {
// 将 offsetTable 写到磁盘
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());

}
}

...

// 一个 task 处理一个级别的延时消息
class DeliverDelayedMessageTimerTask extends TimerTask {

private final int delayLevel; // 保存这个 task 要处理哪个级别
private final long offset;

@Override
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}

/* 检查 SCHEDULE_TOPIC_XXXX 中哪些消息到期
*/
public void executeOnTimeup() {
ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;

if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();

// 遍历一个 queue 的所有消息
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();

if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error(...);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}

long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); // 到期时间戳

extOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

// 如果 deliverTimestamp < now 即是过期
long countdown = deliverTimestamp - now;

if (countdown <= 0) {
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);

if (msgExt != null) {
tru {
/* 从 msgExt 还原实际的 MessageExtBrokerInner (topic, queueId 等)
*/
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);

/* 重新写消息(会写到 commit log 和 consume queue
*/
PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);

// 写成功,则检查下一条消息
if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(...);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);

ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} catch (Exception e) {
// 记录日志
...
}
}
} else { // else of if (countdown <= 0)
// 未到期
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // End of for

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

// 重新开始检查
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} else {
// 记录出错日志
...
}
} // End of if (cq != null)

// 处理完一次再重新调度
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
}
}
}