(1). 概述
前面对DefaultMQPullConsumer进行了简单的介绍,这一篇,直接绕过DefaultMQPullConsumer,而是直面底层的RPC调用,并模拟调用,目的在于,能更清晰的了解RocketMQ的功能,可以自由组合这些API.
(2). 自定义DefaultMQPullConsumer.pullBlockIfNotFound拉取消息
public class PullMessageTest {
public static void main(String[] args) throws Exception {
// TODO 应该要从 NameServer 读取
// brokder address
String addr = "10.0.6.82:10911";
String consumerGroup = "test";
// TODO 应该要从NameServer读取
String topic = "TopicTest";
// TODO 应该要从NameServer读取
int queueId = 1;
long offset = 0;
int maxNums = 10;
int sysFlagInner = 6;
long commitOffset = 0;
long brokerSuspendMaxTimeMillis = 20000;
String subExpression = "*";
long subVersion = 0;
String expressionType = "TAG";
// 创建拉取请求
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
// 创建请求
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
NettyClientConfig nettyClientConfig = new NettyClientConfig();
RemotingClient remotingClient = new NettyRemotingClient(nettyClientConfig);
remotingClient.start();
// 从Broder拉取消息,不需要NameServer信息
// remotingClient.updateNameServerAddressList(Arrays.asList("127.0.0.1:9876"));
// 发起远程请求
RemotingCommand response = remotingClient.invokeSync(addr, request, 1000 * 3);
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
switch (response.getCode()) {
case ResponseCode.SUCCESS:
// 如果有消息
pullStatus = PullStatus.FOUND;
break;
case ResponseCode.PULL_NOT_FOUND:
pullStatus = PullStatus.NO_NEW_MSG;
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
pullStatus = PullStatus.NO_MATCHED_MSG;
break;
case ResponseCode.PULL_OFFSET_MOVED:
pullStatus = PullStatus.OFFSET_ILLEGAL;
break;
default:
throw new MQBrokerException(response.getCode(), response.getRemark());
}
// 解码
PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response
.decodeCommandCustomHeader(PullMessageResponseHeader.class);
PullResultExt pullResult = new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(),
responseHeader.getMinOffset(), responseHeader.getMaxOffset(), null,
responseHeader.getSuggestWhichBrokerId(), response.getBody());
// 解码body
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResult.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
for (MessageExt msg : msgListFilterAgain) {
// 事务支持
String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
// 设置minOffset
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
// 设置maxOffset
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
}
pullResult.setMsgFoundList(msgListFilterAgain);
}
pullResult.setMessageBinary(null);
System.out.println(pullResult);
}
}