(1). 概述
前面对DefaultMQPullConsumer进行了简单的介绍,这一篇,直接绕过DefaultMQPullConsumer,而是直面底层的RPC调用,并模拟调用,目的在于,能更清晰的了解RocketMQ的功能,可以自由组合这些API.
(2). 生产消息
package help.lixin.example;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class SendMessageTest {
public static void main(String[] args) throws Exception {
String brokerAddr = "10.0.6.82:10911";
String brokerName = "k-161019-04-PC";
NettyClientConfig nettyClientConfig = new NettyClientConfig();
NettyRemotingClient remotingClient = new NettyRemotingClient(nettyClientConfig);
remotingClient.start();
Message msg = new Message("TopicTest", "TagE", "id-" + String.valueOf(0),
(0 + " - TagE Hello,World B!!!").getBytes(RemotingHelper.DEFAULT_CHARSET));
MessageClientIDSetter.setUniqID(msg);
Map<String, String> properties =msg.getProperties();
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup("ProducerGroup");
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic("AUTO_CREATE_TOPIC_KEY");
requestHeader.setDefaultTopicQueueNums(4);
requestHeader.setQueueId(5);
requestHeader.setSysFlag(0);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(0);
requestHeader.setProperties(MessageDecoder.messageProperties2String(properties));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(Boolean.FALSE);
requestHeader.setBatch(Boolean.FALSE);
SendResult sendResult = null;
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2
.createSendMessageRequestHeaderV2(requestHeader);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
request.setBody(msg.getBody());
RemotingCommand response = remotingClient.invokeSync(brokerAddr, request, 1000 * 30);
if (response.getCode() == ResponseCode.SUCCESS) {
SendStatus sendStatus = SendStatus.SEND_OK;
SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response
.decodeCommandCustomHeader(SendMessageResponseHeader.class);
MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
sendResult = new SendResult(sendStatus, uniqMsgId, responseHeader.getMsgId(), messageQueue,
responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
if (regionId == null || regionId.isEmpty()) {
regionId = MixAll.DEFAULT_TRACE_REGION_ID;
}
if (traceOn != null && traceOn.equals("false")) {
sendResult.setTraceOn(false);
} else {
sendResult.setTraceOn(true);
}
sendResult.setRegionId(regionId);
}
}
}