【阅读笔记】rocketmq 特性实现 —— 消息产生

通过源码分析 rocketmq 消息产生与消费如何实现

消息产生示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("producer1"); // 创建一个 producer
producer.setNamesrvAddr("127.0.0.1:9876"); // 指定 namesrv 地址
producer.start(); // 启动

// 构造 Message
Message msg = new Message(
"TopicTest",// topic
"TagA",// tag
("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) // body
);

// 发送到 broker
producer.send(msg);
}

消息产生的实现

producer 端发送消息到 broker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446

/*
* ClientConfig 是客户端的公共配置,包括 producer 和 consumer 的
* MQProducer 定义了 producer 接口,有各种形式的 send, start 和 shutdown
* MQProducer 继承 MQAdmin,定义一些管理功能接口
*/
public class DefaultMQProducer extends ClientConfig implements MQProducer {

// 真正实现 producer 的功能, DefaultMQProducer 是一层包装的门面
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

// 消息发送都是这样转发到 DefaultMQProducerImpl
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}

...
}

public class DefaultMQProducerImpl implements MQProducerInner {

...

// client 实例
private MQClientInstance mQClientFactory;

// 选择合适的 MQ
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();

...

// 发送同步消息,异步和 one way 方式大同小异
public SendResult send(Message msg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

...

// 同步发送消息
public SendResult send(Message msg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

return sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

// 检查状态和消息
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);

final long invokeID = random.nextLong(); // 请求 ID
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
...

// 重试次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

...
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();

// 选出一个 MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
// 处理超时情况
beginTimestampPrev = System.currentTimeMillis();
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}

/*
* 发送消息到 broker
*/
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

endTimestamp = System.currentTimeMillis();

// 更新 mqFaultStrategy
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (...) {
...
}
} else {
break;
}
}

if (sendResult != null) {
return sendResult;
}

// 报错抛出异常
...
}

... // 也是错误处理的代码
} // End sendDefaultImpl


private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();

// 找到 broker (master)的地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
// 会从 namesrv 更新 broker 信息
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

SendMessageContext context = null;
if (brokerAddr != null) {
// 是否用快速通道(端口),这个会导致版本较新的 client 可能访问不了效旧的 broker
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}

// 如果需要,会将 msg 的 body 压缩
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}

// 消息事务的标识
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

// 如果有 forbidden hook,执行之
if (hasCheckForbiddenHook()) {
...
}

// 如果有 send message hook,执行之
if (hasSendMessageHook()) {
...
}

/* 发送消息
*/
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}

String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}

SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
// 修复 BUG https://github.com/apache/rocketmq-externals/issues/66
Message tmpMessage = msg;
if (msgBodyCompressed) {
tmpMessage = MessageAccessor.cloneMessage(msg);
msg.setBody(prevBody);
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}

// MQClientAPIImpl 发送命令到 broker
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage, // 这里是 tmpMessage
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;

case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}

// MQClientAPIImpl 发送命令到 broker
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg, // 这里是 msg
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}

// 执行 send message hook
if (this.hasSendMessageHook()) {
...
}

return sendResult;

} catch (...) {
...
} finally {
msg.setBody(prevBody);
}
}

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
} // End sendKernelImpl
}

// 封装一个 RemotingClient,clinet 端 RPC 都通过这个类调用
class MQClientAPIImpl {

// 是否发送简短版的消息头
private static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));

// 远程通信
private final RemotingClient remotingClient;

...

public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {

return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
}

/*
* 这个方法用 remotingClient 发送 RemotingCommand 到 broker, 完成消息的产生
*/
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {

long beginStartTime = System.currentTimeMillis();

// 设置请求 header
// 这里比较巧妙, SendMessageRequestHeaderV2 的成员名称比较短,序列化后的数据也比较短
RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE :
RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}

// 设置请求 body
request.setBody(msg.getBody());

switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;

case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;

case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}

return null;
} // End sendMessage

private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {

RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);

// 同步调用一定会有返回值,即使异常也会有
assert response != null;

// 处理发送结果
return this.processSendResponse(brokerName, msg, response);
}

private void sendMessageAsync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final AtomicInteger times,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {

// 异步调用
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {

@Override
public void operationComplete(ResponseFuture responseFuture) {

// 返回值
RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback && response != null) {
// 处理 sendCallback 为空的情况
...
}

if (response != null) {
try {
// 调用 send message hook
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
...

// 调用发送成功的 callback
try {
sendCallback.onSuccess(sendResult);
} catch (Throwable e) {
}

...
} catch (...) {
...
}
} else {
// response == null,这种情况要不就是发送失败,要不就是等待 response 超时
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);

// 处理不同类型的异常,可能会调用 sendMessageAsync 进行重试
...
}
}
});
} // End sendMessageAsync
}

producer 端选择 broker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122

// 发送消息时,需要确定消息应该发到哪个队列, DefaultMQProducerImpl 的成员 mqFaultStrategy 处理这个问题

public class DefaultMQProducerImpl implements MQProducerInner {

private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();

SendResult sendDefaultImpl(...) {
...

MessageQueue mq = null;

// 重试次数
for (; times < timesTotal; times++) {
// 只有发生重试,lastBrokerName 才会非空
String lastBrokerName = null == mq ? null : mq.getBrokerName();

// MessageQueue 包含 brokerName, topic 和 queueId
MessageQueue mqSelected = selectOneMessageQueue(topicPublishInfo, lastBrokerName);

...

// mq = mqSelected;
// endTimestamp 发送完成的时间
// beginTimestampPrev 开始发送的时间
updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

...
}

...
}

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
}

public class MQFaultStrategy {

// LatencyFaultTolerance 用 ConcurrentHashMap 记录每个 broker 的发送时延
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
...

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// sendLatencyFaultEnable 默认是 false,可以用 DefaultMQProducer#setSendLatencyFaultEnable(true) 设置为 true
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;

/* 选一个 MessageQueue,如果这个 mq 所在的 broker 的时延在可接受范围内,就返回
* isAvailable 返回 (System.currentTimeMillis() - startTimestamp) >= 0
* startTimestamp 在 updateFaultItem 时更新
*/
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}

// 到这里的话,前面没选出一个 MQ, 这里需要选一个返回
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}

// 根据 TopicPublishInfo 的 sendWhichQueue 成员选下一个
return tpInfo.selectOneMessageQueue();
}

// 如果 lastBrokerName 为空, 在其成员 messageQueueList 中选一个 MessageQueue,跟上一次不是同一个 MessageQueue
// 如果 lastBrokerName 非空,则选出的 MessageQueue 所在的 borker Name 要与 lastBrokerName 不同
return tpInfo.selectOneMessageQueue(lastBrokerName);
}

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
// 将 currentLatency 更新到 FaultItem 的 currentLatency 成员
// 将 System.currentTimeMillis() + duration 更新到 FaultItem 的 startTimestamp 成员
// startTimestamp 表示在这个时间之前,不要使用 borkerName 这个 broker, isAvailable 将会返回 false
latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}

/* 计算 duration,根据 latencyMax 和 notAvailableDuration 计算
* 若 currentLatency >= 15000L, 则 duration 为 600000L (600 秒)
*/
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}

...

}

borker 端存储消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
...

// 处理 SEND_MESSAGE_V2 和 SEND_BATCH_MESSAGE 命令的请求头,转换成普通的 SEND_MESSAGE
SendMessageRequestHeader requestHeader = parseRequestHeader(request);

// 消息上下文,包含消息id,所属 producerGroup,topic 等从 producer 发送信息
// 也包括消息存盘后产生的 queueOffset,bornTimeStamp 等信息
SendMessageContext mqtraceContext = buildMsgContext(ctx, requestHeader);

this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

RemotingCommand response;
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}

this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

// 返回给 producer
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

...

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();

// 设置 msgInner 成员
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
msgInner.setBody(body);

// 其他成员略

...

// 将消息 put 到存储层
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

/* 设置返回码
* 返回 response 到 producer
*/
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
}