// 选择合适的 MQ private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
...
// 发送同步消息,异步和 one way 方式大同小异 public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
finallong invokeID = random.nextLong(); // 请求 ID long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst;
byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); }
// 如果需要,会将 msg 的 body 压缩 int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; }
// 消息事务的标识 final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; }
// 如果有 forbidden hook,执行之 if (hasCheckForbiddenHook()) { ... }
// 如果有 send message hook,执行之 if (hasSendMessageHook()) { ... }
case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { thrownew RemotingTooMuchRequestException("sendKernelImpl call timeout"); }
public SendResult sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, finallong timeoutMillis, final CommunicationMode communicationMode, final SendMessageContext context, final DefaultMQProducerImpl producer )throws RemotingException, MQBrokerException, InterruptedException {
/* * 这个方法用 remotingClient 发送 RemotingCommand 到 broker, 完成消息的产生 */ public SendResult sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, finallong timeoutMillis, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, finalint retryTimesWhenSendFailed, final SendMessageContext context, final DefaultMQProducerImpl producer )throws RemotingException, MQBrokerException, InterruptedException {
privatevoidsendMessageAsync( final String addr, final String brokerName, final Message msg, finallong timeoutMillis, final RemotingCommand request, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, finalint retryTimesWhenSendFailed, final AtomicInteger times, final SendMessageContext context, final DefaultMQProducerImpl producer )throws InterruptedException, RemotingException {
// 异步调用 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName){ // sendLatencyFaultEnable 默认是 false,可以用 DefaultMQProducer#setSendLatencyFaultEnable(true) 设置为 true if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0;