【阅读笔记】rocketmq 特性实现 —— 消息事务

分析 rocketmq 消息事务如何实现

前言

消息事务将消息的发送拆分为两个阶段。

第一阶段是 prepare, producer 发送 prepare 消息到 broker,然后执行本地事务,

第二阶段是 confirm, producer 执行完本地事务后向 broker 发送 confirm,消息生效。

通过这种方式可以解决两个系统的分布式事务问题:A(存在DB操作)、B(存在DB操作)两方需要保证分布式事务一致性,通过引入中间层 MQ,A 和 MQ 保持事务一致性(异常情况下通过 MQ 反查 A 接口实现 check ),B 和 MQ 保证事务一致(通过重试),从而达到最终事务一致性。

原理

  1. producer 指定一个 TransactionListener 的实现类
  2. producer 发送消息(这是 prepare 消息)成功后,会执行 TransactionListener 的 executeLocalTransaction 方法执行本地事务
  3. executeLocalTransaction 执行成功后,返回 LocalTransactionState.COMMIT_MESSAGE 标志,然后发送 confirm 到 broker
  4. prepare 消息发送成功后, broker 会等待 confirm 消息,如果一定时间过去后还没收到,则到 producer 查询,这时会调用 TransactionListener 的 checkLocalTransaction 方法检查本地事务是否完成

prepare 消息其实是保存到了一个特殊的 topic,当消息提交后,再发回原来的 topic。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

public static void main(String[] args) throws Exception {

// 要用 TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("producer1");

// TransactionListenerImpl 执行本地事务
// 当 broker 检查事务情况时,用于检查本地事务是否完成
producer.setTransactionListener(new TransactionListenerImpl());

producer.setNamesrvAddr("127.0.0.1:9876");

producer.start();

Message msg = new Message(
"TopicTest",// topic
"TagA",// tag
("Transaction").getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);

// 发送 prepare 消息
// 可以传一个参数
producer.sendMessageInTransaction(msg, null);
}


public class TransactionListenerImpl implements TransactionListener {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("TransactionListenerImpl#executeLocalTransaction");
return LocalTransactionState.COMMIT_MESSAGE;
}

// 检查事务
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("TransactionListenerImpl#checkLocalTransaction");
return LocalTransactionState.COMMIT_MESSAGE;
}
}

实现

producer 端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145

public class TransactionMQProducer extends DefaultMQProducer {
...

@Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}

return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
}

public class DefaultMQProducerImpl implements MQProducerInner {

public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {

// 获取用户设置的 TransactionListener
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);

// 给消息加入 PROPERTY_TRANSACTION_PREPARED 和 PROPERTY_PRODUCER_GROUP 属性
// PROPERTY_TRANSACTION_PREPARED 属性为 true 即 prepare 消息
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
// prepare 消息发送成功,执行本地事务
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}

// 执行本地事务
// localTransactionExecuter 是旧版本的代码,以后会去掉
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");

// 执行 TransactionListener 的 executeLocalTransaction 方法
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
// 发送不成功就不执行本地事务
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

// 结束事务
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}


public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {

final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}

String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;

// 发送 END_TRANSACTION 命令
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
}
}

broker 端实现

处理 prepare 消息, 将消息保存到一个特殊的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {


private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
...

// 检查消息的 PROPERTY_TRANSACTION_PREPARED 属性是否为 true
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
// broker 可以禁用事务
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(".");
return response;
}

putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
}
}


// TransactionalMessageServiceImpl 是 org.apache.rocketmq.broker.transaction.queue 包的类
public class TransactionalMessageServiceImpl implements TransactionalMessageService {

@Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner);
}
}

public class TransactionalMessageBridge {

public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
// parseHalfMessageInner 对原消息做特殊处理
// putMessage 保存消息
return store.putMessage(parseHalfMessageInner(messageInner));
}

// 将消息做特殊处理
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 记录原 topic
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId()));

msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 设置 topic 为 RMQ_SYS_TRANS_HALF_TOPIC 这个特殊的 topic
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
}

处理 END_TRANSACTION 命令 (confirm)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

public class EndTransactionProcessor implements NettyRequestProcessor {

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {

final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.info("Transaction request:{}", requestHeader);

// SLAVE 不做这个
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}

...

OperationResult result = new OperationResult();

if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 其实是查出 prepare 消息的内容
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);

// 提交成功
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {

// 构造一个新的消息, endMessageTransaction 设置好新消息的内容同原消息一样
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());

// 保存新消息到 commit log
RemotingCommand sendResult = sendFinalMessage(msgInner);

// 清理 prepare 消息
// 往 RMQ_SYS_TRANS_OP_HALF_TOPIC 这个 topic 发一个消息,表示删除
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 其实是查出 prepare 消息的内容
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);

if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);

// 清理 prepare 消息
// 同提交是一样的
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}

// 还是 UNKNOWN 就什么都不做
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
}

本地事务检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352

// broker 端负责询问 client 事务消息是否完成, broker 启动时启动
public class TransactionalMessageCheckService extends ServiceThread {

@Override
public void run() {
// 检查事务的间隔,默认 60s
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
}

@Override
protected void onWaitEnd() {

// 超时时间和最大检查次数
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();

// 检查事务
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
}
}

public class TransactionalMessageServiceImpl implements TransactionalMessageService {

@Override
public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {

// 从这个指定的 topic 取出 message queue
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}

// 循环对每个 queue 的消息检查事务
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();

// RMQ_SYS_TRANS_OP_HALF_TOPIC 这个 topic 下的 queue
MessageQueue opQueue = getOpQueue(messageQueue);
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);

log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset);
continue;
}

List<Long> doneOpOffset = new ArrayList<>();
// 已经被删除的 prepare 消息
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("...");
continue;
}

// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
while (true) {

// 处理时间太长,中断 while 循环
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}

if (removeMap.containsKey(i)) {
log.info("Half offset {} has been committed/rolled back", i);
removeMap.remove(i);
} else {

// 取得一个 half message
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();

if (msgExt == null) {
// 消息检查的次数过多,中断
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}

// 没 prepare 消息,不用检查
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("...");
break;
} else {

log.info("...");
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}

// 检查是否应该丢弃或跳过消息
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}

if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("...");
break;
}

long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.debug("...");
break;
}
}

List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);

if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}

// 异步询问 clinet 事务状态
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.info("...");
continue;
}
} // End of else

newOffset = i + 1;
i++;
} // End of while

if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
} // End of for
}
}

public abstract class AbstractTransactionalMessageCheckListener {
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}

public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);

String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);

// 在同组的 producer 中取一个
Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
if (channel != null) {
// 发送 CHECK_TRANSACTION_STATE 命令
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}
}

// client 端,处理来自 broker 的 CHECK_TRANSACTION_STATE 命令, 检查本地事务状态
public class ClientRemotingProcessor implements NettyRequestProcessor {

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {

switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
...
}
}

// 处理 borker 的事务查询
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {

final CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
}

final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
// 让指定 group 的 producer 才会处理,其实就是发送消息的 producer
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());

// 检查本地事务状态
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
} else {
log.warn("checkTransactionState, pick producer group failed");
}
} else {
log.warn("checkTransactionState, decode message failed");
}

return null;
}
}


public class DefaultMQProducerImpl implements MQProducerInner {

@Override
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {

// 提交一个 Runnable 到线程池
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

@Override
public void run() {
// TransactionCheckListener 将来会去掉
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();

if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");

// 调用客户代码检查事务
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}

// 发送 END_TRANSACTION 命令
this.processTransactionState(localTransactionState, group, exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}

// 处理事务查询结果
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {

final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);

String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());

switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}

String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}

try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};

this.checkExecutor.submit(request);
}
}