【阅读笔记】rocketmq 消息存储服务 (五)

描述 rocketmq store 模块如何提供服务

简介

DefaultMessageStore 将 CommitLog, ConsumeQueue, Index 以及其他服务组织在一起,为上层的 broker 提供服务,以及通过常驻的线程完成存储模块的其他工作

DefaultMessageStore 代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

public class DefaultMessageStore implements MessageStore {

private final MessageStoreConfig messageStoreConfig; // store 相关配置

private final CommitLog commitLog;

/* consume queue 表
topic1
1 -> ConsumeQueue
2 -> ConsumeQueue
3 -> ConsumeQueue
4 -> ConsumeQueue
topic2
1 -> ConsumeQueue
2 -> ConsumeQueue
3 -> ConsumeQueue
4 -> ConsumeQueue
*/
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

// 一个常驻线程,定期 flush consumeQueueTable 里所有 consume queue。周期由 flushIntervalConsumeQueue 控制,默认 1000 ms
private final FlushConsumeQueueService flushConsumeQueueService;

// 磁盘满时,清理过期的 commit log 文件
private final CleanCommitLogService cleanCommitLogService;

// 删除过期的 consume queue 文件
private final CleanConsumeQueueService cleanConsumeQueueService;

// 索引服务
private final IndexService indexService;

// 文件预热服务
private final AllocateMappedFileService allocateMappedFileService;

// 消息成功存到 commit log 后,再通过这个服务分发到 consume queue 和 index 分别储存相关信息
private final ReputMessageService reputMessageService;

// HA 服务。 为了更新 master 的地址
private final HAService haService;

// 处理延时投递消息
private final ScheduleMessageService scheduleMessageService;

// 统计服务
private final StoreStatsService storeStatsService;

// ByteBuffer 池,给 MappedFile 用的
private final TransientStorePool transientStorePool;

private final RunningFlags runningFlags = new RunningFlags();
private final SystemClock systemClock = new SystemClock();

// 执行周期任务的线程池
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("StoreScheduledThread"));

private final BrokerStatsManager brokerStatsManager;

// 长轮询功能,当有消息到达时通知侦听器
private final MessageArrivingListener messageArrivingListener;

// broker 的配置
private final BrokerConfig brokerConfig;

private volatile boolean shutdown = true;

private StoreCheckpoint storeCheckpoint;

private AtomicLong printTimes = new AtomicLong(0);

// CommitLogDispatcher 用于在消息写到 commit log 后,分派处理这个事件
// 有两个实现,分别是 CommitLogDispatcherBuildConsumeQueue(写 consume queue) 和 CommitLogDispatcherBuildIndex(创建索引)
private final LinkedList<CommitLogDispatcher> dispatcherList;

// 文件锁,防止两个 store 同时使用同一个目录做 storePath
private RandomAccessFile lockFile;
private FileLock lock;

boolean shutDownNormal = false;
}

写消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

public PutMessageResult putMessage(MessageExtBrokerInner msg) {
...

long beginTime = this.getSystemClock().now();

// 将消息放到 commit log
PutMessageResult result = this.commitLog.putMessage(msg);

long eclipseTime = this.getSystemClock().now() - beginTime;
...
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
}

...

// 常驻线程
class ReputMessageService extends ServiceThread {

// 记录 reput 到哪个位置
private volatile long reputFromOffset = 0;

// 启动时会恢复 reputFromOffset
public void setReputFromOffset(long reputFromOffset) {
this.reputFromOffset = reputFromOffset;
}

public void run() {
...
while (!this.isStopped()) {
...
Thread.sleep(1);
this.doReput();
}
...
}

private void doReput() {
...
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

...

// getData 是返回 reputFromOffset 到 MappedFile 的 read position 之前的数据
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
...
this.reputFromOffset = result.getStartOffset();

for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 遍历 dispatcherList 中的 CommitLogDispatcher,调用每个 CommitLogDispatcher 的 dispatch 方法
// 写 consume queue 和 index
DefaultMessageStore.this.doDispatch(dispatchRequest);

if (不是 slave && 长轮询打开) {
// 通知消息已经到达,长轮询返回
DefaultMessageStore.this.messageArrivingListener.arriving(...);
}

this.reputFromOffset += size;
readSize += size;

if (不是 slave) {
// 写消息统计
DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());
}

} else if (size == 0) {
// 转到下一个 commit log 文件
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
...
}
}
}
}
}
}

读消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201

/* 读取逻辑消息
* 先从 consume queue 读出消息在 commit log 中的位置
* 再从 commti log 读消息
* 最多读 maxMsgNums 条消息
*/
public GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {

...

// 保存结果
GetMessageResult getResult = new GetMessageResult();

final long maxOffsetPy = this.commitLog.getMaxOffset();

ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); // 找到消息队列
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();

if (maxOffset == 0) {
...
} else if (offset < minOffset) {
...
} else if (offset == maxOffset) {
...
} else if (offset > maxOffset) {
...
} else {
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {

...

int i = 0;
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();

for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // 消息在 commit log 中的开始位置
int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // 消息的大小
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

maxPhyOffsetPulling = offsetPy;

if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset) {
continue;
}
}

// 检查这个 offset 是在物理内存还是虚拟内存 ?
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);

// 如果读够一批消息,就中断
if (isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) {
break;
}

...

// 检查读出的消息是否满足 messageFilter
if (messageFilter != null &&
!messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {

if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}

continue;
}

// 从 commit log 中读消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}

nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); // 转到下个文件继续
continue;
}

// 再一次检查 messageFilter
if (messageFilter != null &&
!messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {

if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}

// release...
selectResult.release();
continue;
}

this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();

getResult.addMessage(selectResult); // 读到的消息放到结果里
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}

...

// 是否建议从 slave 读
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);

} finally {
bufferConsumeQueue.release();
}
} else {
...
}
}
} else {
// 找不到 consume queue
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}

...

getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}


/* 消息搜索
* begin 时间戳
* end
*/
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
QueryMessageResult queryMessageResult = new QueryMessageResult();

long lastQueryMsgTime = end;

for (int i = 0; i < 3; i++) {
// 查索引
QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
if (queryOffsetResult.getPhyOffsets().isEmpty()) { // queryOffsetResult.getPhyOffsets() 返回一个 List<Long>
break;
}

Collections.sort(queryOffsetResult.getPhyOffsets());

// 这是不同的变量 (- -!)
queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());

for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
long offset = queryOffsetResult.getPhyOffsets().get(m);

try {
boolean match = true;
// 根据 offset 读一条消息,由于消息的第一个字段就是 size,可以先读 size,再读出完整的消息
MessageExt msg = this.lookMessageByOffset(offset);
if (0 == m) {
lastQueryMsgTime = msg.getStoreTimestamp(); // 只为取得最旧的时间戳
}

// 旧代码会计算 match 的值

if (match) {
// 取 offset 后的所有数据
SelectMappedBufferResult result = this.commitLog.getData(offset, false);
if (result != null) {
int size = result.getByteBuffer().getInt(0);
result.getByteBuffer().limit(size);
result.setSize(size);
queryMessageResult.addMessage(result);
}
} else {
log.warn("queryMessage hash duplicate, {} {}", topic, key);
}
} catch (Exception e) {
log.error("queryMessage exception", e);
}
}

if (queryMessageResult.getBufferTotalSize() > 0) {
break;
}

if (lastQueryMsgTime < begin) {
break;
}
}

return queryMessageResult;
}

缓存池

用于 commit log 的读写。由于文件内存映射的内存不一定在物理内存上, 用物理内存做缓冲可以提高读写性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

public class TransientStorePool {

private final int poolSize; // 缓存池大小,默认 5
private final int fileSize; // commit log 文件大小,默认 1G
private final Deque<ByteBuffer> availableBuffers; // 保存 ByteBuffer 对象的 DispatchRequest
private final MessageStoreConfig storeConfig; // store 配置的引用,其实只是取两个

public TransientStorePool(final MessageStoreConfig storeConfig) {
this.storeConfig = storeConfig;
this.poolSize = storeConfig.getTransientStorePoolSize();
this.fileSize = storeConfig.getMapedFileSizeCommitLog();
this.availableBuffers = new ConcurrentLinkedDeque<>();
}

public void init() {
// 创建 poolSize 个 ByteBuffer
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); // 锁定内存

availableBuffers.offer(byteBuffer);
}
}

public void destroy() {
for (ByteBuffer byteBuffer : availableBuffers) {
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize)); // 解除内存锁定
}
}

// 归还 ByteBuffer
public void returnBuffer(ByteBuffer byteBuffer) {
byteBuffer.position(0);
byteBuffer.limit(fileSize); // position(0) 和 limit(fileSize) 使得 ByteBuffer 下次取出时可以直接用
this.availableBuffers.offerFirst(byteBuffer);
}

// 借出 byteBuffer
public ByteBuffer borrowBuffer() {
ByteBuffer buffer = availableBuffers.pollFirst();
if (availableBuffers.size() < poolSize * 0.4) {
log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
}
return buffer;
}

// 如果可用的为 0, 可能会触发 sendMessage 流控
public int remainBufferNumbs() {
if (storeConfig.isTransientStorePoolEnable()) {
return availableBuffers.size();
}
return Integer.MAX_VALUE;
}
}

文件预热

预先创建文件,并在空闲时将文件刷入内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179

public class AllocateMappedFileService extends ServiceThread {

private ConcurrentMap<String, AllocateRequest> requestTable = new ConcurrentHashMap<>();

private PriorityBlockingQueue<AllocateRequest> requestQueue = new PriorityBlockingQueue<>();

private volatile boolean hasException = false;

private DefaultMessageStore messageStore;

...

// 提交创建文件的请求
// nextFilePath 是下个文件,nextNextFilePath 是下下个文件
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
//if broker is slave, don't fast fail even no buffer in pool
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) {
canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
}
}

// 创建一个 request
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;

if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn(...); // 没法处理
this.requestTable.remove(nextFilePath);
return null;
}

boolean offerOK = this.requestQueue.offer(nextReq); // 入队列
if (!offerOK) {
log.warn(...); // 这里应该不会运行
}
canSubmitRequests--;
}

// 创建一个 request,重复上面的代码,只是处理的是 nextNextFilePath
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;

if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn(...); // 没法处理
this.requestTable.remove(nextFilePath);
// 注意,这里不用 return null;
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn(...); // 这里应该不会运行
}
}
}

if (hasException) {
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}

AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
// 等待 count down
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn(...);
return null;
} else {
this.requestTable.remove(nextFilePath);
return result.getMappedFile(); // 创建好 MappedFile 对象,返回
}
} else {
log.error(...);
}
} catch (InterruptedException e) {
log.warn(...);
}

return null;
}

...

public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped() && this.mmapOperation()) {
}

log.info(this.getServiceName() + " service end");
}

private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;

try {
req = this.requestQueue.take(); // 从队列取一个请求

AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());

if (null == expectedRequest) {
log.warn(...)
return true;
}

// 应该不会发生这种情况
if (expectedRequest != req) {
log.warn(...)
return true;
}

if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();

// 创建 MappedFile 对象
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); // 动态加载 MappedFile,可能开源版本跟闭源版本不一样
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}

long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
if (eclipseTime > 10) {
log.warn(...)
}

// 将文件加载到内存
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()
&& this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {

mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());

}

// 设置结果
req.setMappedFile(mappedFile);

this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) { // 被中断
log.warn(...)
this.hasException = true;
return false;
} catch (IOException e) {
// 有 IO 异常的话,将请求放回队列
log.warn(...)
this.hasException = true;
if (null != req) {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
// count down,唤醒上面 wait 的线程
if (req != null && isSuccess)
req.getCountDownLatch().countDown();
}
return true;


}