(1). DefaultMQPullConsumer 的继承关系
org.apache.rocketmq.client.MQAdmin
org.apache.rocketmq.client.consumer.MQConsumer
org.apache.rocketmq.client.consumer.MQPullConsumer
org.apache.rocketmq.client.consumer.DefaultMQPullConsumer
(2). 查看MQAdmin职责
# 创建Topic
void createTopic(final String key,
final String newTopic,
final int queueNum)
throws MQClientException;
# 创建Topic
void createTopic(String key,
String newTopic,
int queueNum,
int topicSysFlag)
throws MQClientException;
# 搜索offset
long searchOffset(final MessageQueue mq,
final long timestamp) throws MQClientException;
# 查询最大的offset
long maxOffset(final MessageQueue mq)
throws MQClientException;
# 查询最小的offset
long minOffset(final MessageQueue mq)
throws MQClientException;
# 查询最早的存储时间
long earliestMsgStoreTime(final MessageQueue mq)
throws MQClientException;
# 根据offsetMsgId查看消息信息
MessageExt viewMessage(final String offsetMsgId)
throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
# 根据topic和msgid获得消息
MessageExt viewMessage(String topic,
String msgId)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
# 查询消息(根据topic/key/maxNum/begin/end)
QueryResult queryMessage(final String topic,
final String key,
final int maxNum,
final long begin,
final long end)
throws MQClientException, InterruptedException;
(3). 查看MQConsumer的职责
# 发送消息
void sendMessageBack(final MessageExt msg,
final int delayLevel,
final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
# 根据Topic获取消息
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic)
throws MQClientException;
(4). 查看MQPullConsumer的职责
# 拉取消息
PullResult pull(final MessageQueue mq,
final String subExpression,
final long offset,
final int maxNums)
throws MQClientException, RemotingException, MQBrokerException,InterruptedException;
(5). 查看DefaultMQPullConsumer的职责
# DefaultMQPullConsumer 将所有的请求委托给:DefaultMQPullConsumerImpl进行处理
public class DefaultMQPullConsumer extends
ClientConfig implements MQPullConsumer {
protected final transient
DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
}
(6). 查看MQConsumerInner的职责
# 消费者组名称
String groupName();
# 消息模式(BROADCASTING/CLUSTERING)
# BROADCASTING:消费位置存储在Client(LocalFileOffsetStore)
# CLUSTERING:存储在远端(RemoteBrokerOffsetStore)
MessageModel messageModel();
# 消费者类型(PULL/PUSH)
ConsumeType consumeType();
# CONSUME_FROM_LAST_OFFSET : 默认策略,从该队列最尾开始消费,即跳过历史消息.
# CONSUME_FROM_FIRST_OFFSET : 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
# CONSUME_FROM_TIMESTAMP : 从某个时间点开始消费,和setConsumeTimestamp()配合使用,
# 默认是半个小时以前
ConsumeFromWhere consumeFromWhere();
# 订阅了哪些主题
Set<SubscriptionData> subscriptions();
# 重新负载均衡下
void doRebalance();
# 持久化consumer的消费位置
void persistConsumerOffset();
# 更新topic订阅信息
void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> info);
# 是否订阅更新
boolean isSubscribeTopicNeedUpdate(final String topic);
boolean isUnitMode();
# 查看消费者运行状态
ConsumerRunningInfo consumerRunningInfo();
(7). 以获取Topic路由信息为案例
public class MQClientAPIImpl {
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
break;
}
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
}
(8). 总结
阿里的代码有一个习惯,那就是:基本上是面向接口编程,然后,接口的职责比较明细.