(1). 概述
前面把生产者和消费者都撸了遍源码,这一篇,主要介绍Broker,由于,Broker依赖于网络调用,所以,会比较麻烦,而RocketMQ写了大量的测试类,我们可以可以直接使用测试类来帮且我们辅助学习.
(2). DefaultMessageStoreTest
public class DefaultMessageStoreTest {
private final String StoreMessage = "Once, there was a chance for me!";
private int QUEUE_TOTAL = 100;
private AtomicInteger QueueId = new AtomicInteger(0);
private SocketAddress BornHost;
private SocketAddress StoreHost;
private byte[] MessageBody;
private MessageStore messageStore;
@Before
public void init() throws Exception {
StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
// 1. 构建消息存储对象(MessageStore )
messageStore = buildMessageStore();
// 4. 启动
boolean load = messageStore.load();
assertTrue(load);
// 6. 启动
messageStore.start();
}
}
(3). 构建DefaultMessageStore(DefaultMessageStoreTest)
private MessageStore buildMessageStore() throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
// 单个conmmitlog文件大小默认1GB
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
// 单个consumequeue文件大小默认30W*20表示单个Consumequeue文件中存储30W个ConsumeQueue
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10);
// 单个索引文件hash槽的个数,默认为五百万
messageStoreConfig.setMaxHashSlotNum(10000);
// 单个索引文件索引条目的个数,默认为两千万
messageStoreConfig.setMaxIndexNum(100 * 100);
// 同步刷盘
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
// consumuQueue文件刷盘频率(TimeUnit.MILLISECONDS)
messageStoreConfig.setFlushIntervalConsumeQueue(1);
// 2.创建:DefaultMessageStore
return new DefaultMessageStore( //
messageStoreConfig, // 消息存储配置
new BrokerStatsManager("simpleTest"), // 统计管理配置
new MyMessageArrivingListener(), // 消息保存成功后的回调
new BrokerConfig() // Brokder配置信息
);
}
// 消息刷盘成功之后回调地址.可实现消息的推送或者拉取(PullRequestHoldService)
private class MyMessageArrivingListener
implements MessageArrivingListener {
@Override
public void arriving(
String topic,
int queueId,
long logicOffset,
long tagsCode,
long msgStoreTime,
byte[] filterBitMap,
Map<String, String> properties) {
}
}
(4). DefaultMessageStore
package org.apache.rocketmq.store;
public class DefaultMessageStore
implements MessageStore {
// 消息存储的配置管理
private final MessageStoreConfig messageStoreConfig;
// commitlog日志管理
private final CommitLog commitLog;
// 主题对应的队列集合
private final ConcurrentMap<
String/* topic */,
ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
// consumequeue刷盘管理
private final FlushConsumeQueueService flushConsumeQueueService;
// 清除commitlog管理
private final CleanCommitLogService cleanCommitLogService;
// 清除consumerqueue管理
private final CleanConsumeQueueService cleanConsumeQueueService;
// 存储统计管理
private final StoreStatsService storeStatsService;
// 消息刷盘之后回调(Brokder角色非SLAVE,并且:longPollingEnable=true)
// 才会通知
private final MessageArrivingListener messageArrivingListener;
// Brokder配置管理
private final BrokerConfig brokerConfig;
// Brokder统计管理
private final BrokerStatsManager brokerStatsManager;
// MappendFile管理
private final AllocateMappedFileService allocateMappedFileService;
// consumerqueue刷盘
private final FlushConsumeQueueService flushConsumeQueueService;
// index文件管理
private final IndexService indexService;
// 消息刷盘控制(将会控制index/consumerqueue刷盘)
private final ReputMessageService reputMessageService;
// 高可用管理
private final HAService haService;
// 定时消息管理
private final ScheduleMessageService scheduleMessageService;
// 事务消息存储池
private final TransientStorePool transientStorePool;
// commitlog刷盘之后分发器
private final LinkedList<CommitLogDispatcher> dispatcherList;
// 加载文件
private RandomAccessFile lockFile;
// 3. DefaultMessageStore构造器
public DefaultMessageStore(
final MessageStoreConfig messageStoreConfig,
final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener,
final BrokerConfig brokerConfig ) throws IOException {
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
// 创建MappendFile管理器(专门负责创建commitlog文件)
this.allocateMappedFileService = new AllocateMappedFileService(this);
// commitlog管理
this.commitLog = new CommitLog(this);
// 主题与队列关系映射
this.consumeQueueTable = new ConcurrentHashMap<>(32);
// consumerQueue刷盘管理
this.flushConsumeQueueService = new FlushConsumeQueueService();
// 清除commitLog管理
this.cleanCommitLogService = new CleanCommitLogService();
// 清除consumequeue服务
this.cleanConsumeQueueService = new CleanConsumeQueueService();
// 存储统计
this.storeStatsService = new StoreStatsService();
// index文件管理
this.indexService = new IndexService(this);
// 高可用管理
this.haService = new HAService(this);
// commit消息分发器,根据CommitLog文件构建ConsumeQueue/IndexFile
this.reputMessageService = new ReputMessageService();
// 定时任务消息管理
this.scheduleMessageService = new ScheduleMessageService(this);
// 事务消息存储池(*),以后再细看
this.transientStorePool = new TransientStorePool(messageStoreConfig);
if (messageStoreConfig.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}
// MappedFile启动
this.allocateMappedFileService.start();
// IndexFile线程启动
this.indexService.start();
// CommitLog文件转发处理
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
// C:\Users\lixin\store\lock
File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
// 确保(创建) C:\Users\lixin\store 存在
MappedFile.ensureDirOK(file.getParent());
// C:\Users\lixin\store\lock 创建
lockFile = new RandomAccessFile(file, "rw");
}
}
(5). DefaultMessageStore
package org.apache.rocketmq.store;
public class DefaultMessageStore
implements MessageStore {
// 5. load
public boolean load() {
boolean result = true;
try {
// 查看abort文件是否存在
boolean lastExitOK = !this.isTempFileExist();
// true:代表正常关闭
// false:非正常关闭
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
// 定时消息管理服务启动
if (null != scheduleMessageService) {
// delayOffset.json
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
result = result && this.scheduleMessageService.load();
}
// load Commit Log
// C:\Users\lixin\store\commitlog
result = result && this.commitLog.load();
// load Consume Queue
// C:\Users\lixin\store\consumequeue
result = result && this.loadConsumeQueue();
if (result) {
// 创建checkpoint文件
// C:\Users\lixin\store\checkpoint
// MappedFile.OS_PAGE_SIZE = 4096
// fileChannel.map(MapMode.READ_WRITE, 0, MappedFile.OS_PAGE_SIZE);
// 物理消息时间
// this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);
// 逻辑消息时间
// this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
// indexMsg消息时间
// this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
// 索引文件
// C:\Users\lixin\store\index
this.indexService.load(lastExitOK);
// 恢复(true)
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
// 启动没有成功的情况下,关闭MappedFile
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
} //end load
private void recover(final boolean lastExitOK) {
// 恢复consumequeue
this.recoverConsumeQueue();
if (lastExitOK) {
// 正常恢复comitlog
this.commitLog.recoverNormally();
} else {
this.commitLog.recoverAbnormally();
}
// 恢复 topicqueue
this.recoverTopicQueueTable();
}
private boolean isTempFileExist() {
// C:\Users\lixin\store\abort
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
return file.exists();
} //end isTempFileExist
}
(6). DefaultMessageStore
package org.apache.rocketmq.store;
public class DefaultMessageStore
implements MessageStore {
// 6. start
public void start() throws Exception {
lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}
// C:\Users\lixin\store\lock 写入文件
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
// consumequeue 刷盘
this.flushConsumeQueueService.start();
// commit刷盘
this.commitLog.start();
// store 统计
this.storeStatsService.start();
//
if (this.scheduleMessageService != null &&
SLAVE != messageStoreConfig.getBrokerRole()) {
// 定时任务的消息
// C:\Users\lixin\store\config\delayOffset.json
this.scheduleMessageService.start();
}
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
// commitLog.getMaxOffset() = 0
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
// consumequeue/index刷盘线程
this.reputMessageService.start();
// 高可用(*)
this.haService.start();
// 创建:C:\Users\lixin\store\abort
this.createTempFile();
// 添加定时任务
this.addScheduleTask();
this.shutdown = false;
} //end start
private void addScheduleTask() {
// messageStoreConfig.getCleanResourceInterval() = 10000
// 每隔10秒执行一次清除文件
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
// 10分钟
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.checkSelf();
}
}, 1, 10, TimeUnit.MINUTES);
// 1秒钟(针对执行超过1秒以上的线程打印出线程信息)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
try {
if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
if (lockTime > 1000 && lockTime < 10000000) {
String stack = UtilAll.jstack();
final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"
+ DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
MixAll.string2FileNotSafe(stack, fileName);
}
}
} catch (Exception e) {
}
}
}
}, 1, 1, TimeUnit.SECONDS);
} //end addScheduleTask
}
(7). DefaultMessageStoreTest
package org.apache.rocketmq.store;
public class DefaultMessageStoreTest {
@Test
public void testWriteAndRead() {
long totalMsgs = 10;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
for (long i = 0; i < totalMsgs; i++) {
// 7.创建消息
MessageExtBrokerInner msg = buildMessage();
// 8.保存
messageStore.putMessage(msg);
}
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = messageStore.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
result.release();
}
} //end testWriteAndRead
// 7. 创建一条消息
private MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("FooBar");
msg.setTags("TAG1");
msg.setKeys("Hello");
msg.setBody(MessageBody);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost);
msg.setBornHost(BornHost);
return msg;
} //end buildMessage
}
(8). DefaultMessageStore
package org.apache.rocketmq.store;
public class DefaultMessageStore
implements MessageStore {
// 8.0 保存消息
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
if (this.shutdown) {
// 服务不可用
log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// SLAVE角色保存消息失败
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
// 消息存储模式为:SLAVE,所以保存消息失败
log.warn("message store is slave mode, so putMessage is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// 判断消息是否可写
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
// 设置打印时间
this.printTimes.set(0);
}
// 消息的主题(Topic)的长度不能大于:127字节
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// 消息的属性不能超过127字节
if (msg.getPropertiesString() != null &&
msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
// 如果pagechage繁忙抛出错误
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
long beginTime = this.getSystemClock().now();
// **********************************************
// 开始提交消息
// 9. 提交消息
PutMessageResult result = this.commitLog.putMessage(msg);
long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
public boolean isOSPageCacheBusy() {
// commitlog上一次开始写入的时间锁
long begin = this.getCommitLog().getBeginTimeInLock();
// 当前时间 - 上一次时间
// diff = 1575533625764
long diff = this.systemClock.now() - begin;
// messageStoreConfig.getOsPageCacheBusyTimeOutMills() = 1000
// 两者都满足(代表pagechage有问题)
return
diff < 10000000 &&
diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
} //end isOSPageCacheBusy
}
(9). CommitLog
package org.apache.rocketmq.store;
public class CommitLog {
// 追回消息的回调
private final AppendMessageCallback appendMessageCallback;
// 构造器
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(
// C:\Users\lixin\store\commitlog
defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
// 10485760
// 前面调用:setMapedFileSizeConsumeQueue设置的
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(),
// org.apache.rocketmq.store.AllocateMappedFileService
defaultMessageStore.getAllocateMappedFileService()
);
this.defaultMessageStore = defaultMessageStore;
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷盘策略线程
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
//
this.commitLogService = new CommitRealTimeService();
// 追回消息回调
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
// 10. 提交消息
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
// 设置消息的存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
// 设置消息Body crc32
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
// 存储状态管理服务
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
// 延迟消息处理
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 第一次文件还未创建,mappedFile=null
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 加锁
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
// 1575622465960
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
// 1575622465960 开始任务之前加锁
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
// 如果缓存中不存在文件,则创建文件
//******************************************************
// 11. 创建commit文件,在创建文件时,计算出下一个文件大小
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
// 18. 往MappedFile中追回消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
// 21. 判断消息是否写入成功
switch (result.getStatus()) {
case PUT_OK:
// 21.1
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
// 21.2 释放锁
putMessageLock.unlock();
}
if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}
// false
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
// 包裹追加消息的返回结果
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 统计信息(主题信息自增/主题写入了多少字节)
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
// 22. 刷盘
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
// 22 刷盘
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// 同步刷盘
// Synchronization flush
// defaultMessageStore.getMessageStoreConfig().getFlushDiskType() = SYNC_FLUSH
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// org.apache.rocketmq.store.CommitLog$GroupCommitService
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
// true
if (messageExt.isWaitStoreMsgOK()) {
// 22.1 创建CommitRequest请求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
// ********************************刷盘**********************************************
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
// 22.2 创建Commit服务
class GroupCommitService extends FlushCommitLogService {
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
// 22.2 添加Request请求
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
@Override
protected void onWaitEnd() {
this.swapRequests();
}
@Override
public String getServiceName() {
return GroupCommitService.class.getSimpleName();
}
@Override
public long getJointime() {
return 1000 * 60 * 5;
}
} //end GroupCommitService
// ***************************AppendMessageCallback ************************
class DefaultAppendMessageCallback implements AppendMessageCallback {
// File at the end of the minimum fixed length empty
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
private final ByteBuffer msgIdMemory;
// Store the message content
private final ByteBuffer msgStoreItemMemory;
// The maximum length of the message
private final int maxMessageSize;
// Build Message Key
private final StringBuilder keyBuilder = new StringBuilder();
private final StringBuilder msgIdBuilder = new StringBuilder();
private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
DefaultAppendMessageCallback(final int size) {
this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);
this.maxMessageSize = size;
}
public ByteBuffer getMsgStoreItemMemory() {
return msgStoreItemMemory;
}
// 18.4 默认的追回消息回调
public AppendMessageResult doAppend(
// 0
final long fileFromOffset,
// 临时申请的缓冲区(position == 上次写入的字节数),此处为:0
final ByteBuffer byteBuffer,
// 10485760(文件最大的位置)
final int maxBlank,
// 具体的消息内容
final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
// wroteOffset = 0
long wroteOffset = fileFromOffset + byteBuffer.position();
// 18.5
this.resetByteBuffer(hostHolder, 8);
// msgId = ip+port+wroteOffset
// 0A00065200001FBB0000000000000000
String msgId = MessageDecoder.createMessageId(
this.msgIdMemory,
// 18.5
msgInner.getStoreHostBytes(hostHolder),
wroteOffset );
// FooBar-0
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
// Topic + "-" + QueueId
// 主题名称+队列id
String key = keyBuilder.toString();
// 找到主题对应的offset
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
// 主题对应的queueOffset不存在的情况下
if (null == queueOffset) {
// { FooBar-0 = 0 }
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
// 获取消息的类型
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
/**
* Serialize message
*/
// 获取消息的properties
// propertiesData == nul;
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
// propertiesLength == 0
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
// 属性字节数不能 > 32767
// 0 > 32767
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
// Topic信息
// FooBar
// [70, 111, 111, 66, 97, 114]
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
// topicLength == 6
final int topicLength = topicData.length;
// bodyLength == 32
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
// ************************计算出消息的总长度********************************
// 18.6 调用消息长度.
// msgLen(129) == (32,6,0)
final int msgLen = calMsgLength(
// bodyLength = 32
bodyLength,
// topicLength = 6
topicLength,
// propertiesLength = 0
propertiesLength);
// Exceeds the maximum message
// 18.7 对消息的长度进行限制(消息长度不能 > 4m).
// maxMessageSize = 4194304
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// 生产者一次提交的消息,判断这条消息:是否有足够的磁盘空间
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE = 129
this.msgStoreItemMemory.putInt(msgLen);
// 固定的一个Magiccode
// 2 MAGICCODE = -626843481
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC = 151131488
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID[消息队列的ID] = 0
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG = 0
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// FooBar-0 = 0
// 队列的消息偏移量(注意:队列名称 = 主题名称-队列下标)
// 6 QUEUEOFFSET = 0
this.msgStoreItemMemory.putLong(queueOffset);
// commitLog的起始位置
// 7 PHYSICALOFFSET = 0
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG = 0
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP = 1575884746075
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST = [127, 0, 0, 1, 0, 0, 0, 0]
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP = 1575884746086
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS = [10, 0, 6, 82, 0, 0, 31, -69]
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES = 0
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset = 0
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY = 32
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC = 6
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES = 0
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
// 18. 创建追回消息的返回结果
AppendMessageResult result = new AppendMessageResult(
// PUT_OK
AppendMessageStatus.PUT_OK,
// 0
wroteOffset,
// 129
msgLen,
// 0A00065200001FBB0000000000000000
msgId,
// 1575884746086
msgInner.getStoreTimestamp(),
// 0
queueOffset,
// 310899
CommitLog.this.defaultMessageStore.now() - beginTimeMills
);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// 19.更新队列的偏移量(自增)
// The next update ConsumeQueue information
// 更新ConsumeQueue队列的偏移量
// FooBar-0 = 1
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBatch messageExtBatch) {
byteBuffer.mark();
//physical offset
long wroteOffset = fileFromOffset + byteBuffer.position();
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(messageExtBatch.getTopic());
keyBuilder.append('-');
keyBuilder.append(messageExtBatch.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
long beginQueueOffset = queueOffset;
int totalMsgLen = 0;
int msgNum = 0;
msgIdBuilder.setLength(0);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
this.resetByteBuffer(hostHolder, 8);
ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(hostHolder);
messagesByteBuff.mark();
while (messagesByteBuff.hasRemaining()) {
// 1 TOTALSIZE
final int msgPos = messagesByteBuff.position();
final int msgLen = messagesByteBuff.getInt();
final int bodyLen = msgLen - 40; //only for log, just estimate it
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
totalMsgLen += msgLen;
// Determines whether there is sufficient free space
if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, 8);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
//ignore previous read
messagesByteBuff.reset();
// Here the length of the specially set maxBlank
byteBuffer.reset(); //ignore the previous appended messages
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(),
beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
//move to add queue offset and commitlog offset
messagesByteBuff.position(msgPos + 20);
messagesByteBuff.putLong(queueOffset);
messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen);
storeHostBytes.rewind();
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen);
if (msgIdBuilder.length() > 0) {
msgIdBuilder.append(',').append(msgId);
} else {
msgIdBuilder.append(msgId);
}
queueOffset++;
msgNum++;
messagesByteBuff.position(msgPos + msgLen);
}
messagesByteBuff.position(0);
messagesByteBuff.limit(totalMsgLen);
byteBuffer.put(messagesByteBuff);
messageExtBatch.setEncodedBuff(null);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(),
messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
result.setMsgNum(msgNum);
CommitLog.this.topicQueueTable.put(key, queueOffset);
return result;
}
private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
byteBuffer.flip();
byteBuffer.limit(limit);
}
} //end AppendMessageCallback
// ************************************计算消息的长度*******************************
// 18.6.1 计算消息的总长度
private static int calMsgLength(
// bodyLength = 32
int bodyLength,
// topicLength = 6
int topicLength,
// propertiesLength = 0
int propertiesLength) {
// TOTALSIZE = 整个消息的长度(用于界定消息的边界)[4字节]
// MAGICCODE = 魔数[4字节]
// BODYCRC = 消息体crc32校验码[4字节]
// QUEUEID = 消息消费队列ID[4字节]
// FLAG = RocketMQ不做处理,预留给应用程序使用[4字节]
// QUEUEOFFSET = 消息在消息队列的偏移量[8字节]
// PHYSICALOFFSET = 消息在CommitLog的偏移量[8字节]
// SYSFLAG = 消息系统Flag(是否压缩/是否支持事务)[4字节]
// BORNTIMESTAMP = 生产者调用消息发送API的时间[8字节]
// BORNHOST = 生产者IP和端口
// STORETIMESTAMP = 消息存储时间[8字节]
// STOREHOSTADDRESS = Brokder服务器的IP+端口[8字节]
// RECONSUMETIMES = 消息重试次数[4字节]
// Prepared Transaction Offset = 事务消息物理偏移量[8字节]
// BODY = 消息体内容的长度[4字节]
// TOPIC = 消息主题的长度[1字节]
// propertiesLength = 消息属性长度[2字节],消息属性长度不能超过65536个字符
//
final int msgLen = 4 //TOTALSIZE
+ 4 //MAGICCODE
+ 4 //BODYCRC
+ 4 //QUEUEID
+ 4 //FLAG
+ 8 //QUEUEOFFSET
+ 8 //PHYSICALOFFSET
+ 4 //SYSFLAG
+ 8 //BORNTIMESTAMP
+ 8 //BORNHOST
+ 8 //STORETIMESTAMP
+ 8 //STOREHOSTADDRESS
+ 4 //RECONSUMETIMES
+ 8 //Prepared Transaction Offset
+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
+ 1 + topicLength //TOPIC
+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
+ 0;
return msgLen;
}
}
(10). MappedFileQueue
package org.apache.rocketmq.store;
public class MappedFileQueue {
// commit文件列表
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
// 构造器
public MappedFileQueue(
// C:\Users\lixin\store\commitlog
final String storePath,
// 10485760
int mappedFileSize,
// org.apache.rocketmq.store.AllocateMappedFileService
// 分配MappedFile管理
AllocateMappedFileService allocateMappedFileService) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.allocateMappedFileService = allocateMappedFileService;
}
// 12. 获取最后一个MappedFile文件
public MappedFile getLastMappedFile(final long startOffset) {
// 12.1 startOffset = 0
return getLastMappedFile(startOffset, true);
}
// 12.2
public MappedFile getLastMappedFile(
// 0
final long startOffset,
// true
boolean needCreate) {
long createOffset = -1;
// 12.3 获取最后一个MappedFile
// null
MappedFile mappedFileLast = getLastMappedFile();
// 如果获取MappedFile,代表是第一次请求
if (mappedFileLast == null) {
// 0 = 0 - 0;
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
// 如果最后一个文件存在,则计算出下一个文件的起始offset
if (mappedFileLast != null && mappedFileLast.isFull()) {
// 假设:mappedFileLast.getFileFromOffset() = 00000000000010485760
// 假设:mappedFileSize = 10485760
// createOffset = 20971520
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
//
if (createOffset != -1 && needCreate) {
// C:\Users\lixin\store\commitlog\00000000000000000000
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
// C:\Users\lixin\store\commitlog\00000000000010485760
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// 有指定MappedFile分配服务
if (this.allocateMappedFileService != null) {
//******************************************************
// 13. 委托给:AllocateMappedFileService创建文件
// 交给MappedFile创建MappedFile
mappedFile = this.allocateMappedFileService
.putRequestAndReturnMappedFile(
// C:\Users\lixin\store\commitlog\00000000000000000000
nextFilePath,
// C:\Users\lixin\store\commitlog\00000000000010485760
nextNextFilePath,
// 10485760(1G)
this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
// 17. 异步创建任务成功
if (mappedFile != null) { //true
if (this.mappedFiles.isEmpty()) { // true
mappedFile.setFirstCreateInQueue(true);
}
// 17.1 添加到映射文件集合
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
// 12.3 获取最后一个commit文件
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
// 判断mappedFiles集合不为空则获取集合中最后一个文件
while (!this.mappedFiles.isEmpty()) { //false
try {
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
}
(11). AllocateMappedFileService
package org.apache.rocketmq.store;
// ServiceThread implements Runnable
public class AllocateMappedFileService extends ServiceThread {
private static int waitTimeOut = 1000 * 5;
private ConcurrentMap<String, AllocateRequest> requestTable
= new ConcurrentHashMap<String, AllocateRequest>();
private PriorityBlockingQueue<AllocateRequest> requestQueue
= new PriorityBlockingQueue<AllocateRequest>();
private volatile boolean hasException = false;
private DefaultMessageStore messageStore;
// DefaultMessageStore
public AllocateMappedFileService(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
}
// 14. 添加请求并且返回:MappedFile
public MappedFile putRequestAndReturnMappedFile(
// C:\Users\lixin\store\commitlog\00000000000000000000
String nextFilePath,
// C:\Users\lixin\store\commitlog\00000000000010485760
String nextNextFilePath,
// 10485760
int fileSize) {
int canSubmitRequests = 2;
// false
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
}
}
// 14.1 创建分配请求
// nextFilePath = C:\Users\lixin\store\commitlog\00000000000000000000
// fileSize = 10485760
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
// 如果map中不存在:nextFilePath这个key,则put进去,并返回:true
// 如果map中存在:nextFilePath这个key,则返回:false
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
// true
if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextFilePath);
return null;
}
// 往队列里进行添加
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
canSubmitRequests--;
}
// C:\Users\lixin\store\commitlog\00000000000010485760
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
// true
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
if (hasException) {
// 如果有错误,抛出异常
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}
// 14.2 获取结果:C:\Users\lixin\store\commitlog\00000000000000000000
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
// 16. 获得CountdownLatch,并等待watiTimeOut毫秒
boolean waitOK = result.getCountDownLatch()
.await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) { // true
// 提示创建mmap文件超时
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
// 从map中移除
// 16.1 并返回:MappedFile
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
} //end putRequestAndReturnMappedFile
public void run() {
log.info(this.getServiceName() + " service started");
while ( !this.isStopped()
// 15. 将文件映射进内存
&& this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}
// 15.1 将文件映射进内存
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
// 等待从队列中弹出数据(阻塞线程往下执行)
req = this.requestQueue.take();
// 分配请求
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}
// true
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
// false
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
// 15.2 创建:MappedFile
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
if (eclipseTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}
// 15.4
// mappedFile.getFileSize() == 10485760
// this.messageStore.getMessageStoreConfig().getMapedFileSizeCommitLog() == 10485760
// false
// messageStore.getMessageStoreConfig().isWarmMapedFileEnable() == 15.4
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMapedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
// 设置 AllocateRequest 对象的MappedFile
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
// 设置有异常
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
// 设置有异常
this.hasException = true;
if (null != req) {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
// 15.5 设置请求完成
req.getCountDownLatch().countDown();
}
return true;
}
static class AllocateRequest implements Comparable<AllocateRequest> {
// Full file path
private String filePath;
private int fileSize;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile MappedFile mappedFile = null;
public AllocateRequest(String filePath, int fileSize) {
this.filePath = filePath;
this.fileSize = fileSize;
}
public String getFilePath() {
return filePath;
}
public void setFilePath(String filePath) {
this.filePath = filePath;
}
public int getFileSize() {
return fileSize;
}
public void setFileSize(int fileSize) {
this.fileSize = fileSize;
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public MappedFile getMappedFile() {
return mappedFile;
}
public void setMappedFile(MappedFile mappedFile) {
this.mappedFile = mappedFile;
}
} //end AllocateRequest
}
(12). MappedFile
package org.apache.rocketmq.store;
public class MappedFile extends ReferenceResource {
// 总的虚拟内存大小
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// 文件个数
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
//
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
protected final AtomicInteger committedPosition = new AtomicInteger(0);
private final AtomicInteger flushedPosition = new AtomicInteger(0);
// 文件名称
private String fileName;
// 文件的起始位置
private long fileFromOffset;
// 文件
private File file;
// 文件大小
protected int fileSize;
private MappedByteBuffer mappedByteBuffer;
protected FileChannel fileChannel;
// 15.2 创建MappedFile
public MappedFile(final String fileName, final int fileSize) throws IOException {
init(fileName, fileSize);
}
// 15.3 初始化方法
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
// C:\Users\lixin\store\commitlog\00000000000000000000
this.file = new File(fileName);
// fileFromOffset(0) = Long.parseLong("00000000000000000000")
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
// 递归创建目录:C:\Users\lixin\store\commitlog
ensureDirOK(this.file.getParent());
try {
// 创建文件channel
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
// RandomAccessFile获得的Channel可以开启任意三种模式(MapMode)
// InputStream获得的Channel只能开启:MapMode.READ_ONLY
// 将fileChannel通道与内存进行映射
// MapMode.READ_WRITE : 可读写操作
// position(0) : 文件的起始位置
// size(10485760) : size字节
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
// fileSize = 10485760
// 设置总的内存
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
// 总的Mapped文件数
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
// 如果不成功的情况下,关闭流
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
} //end init
// 18.1 追加消息
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
return appendMessagesInner(msg, cb);
} //end appendMessage
// 18.2 追加消息
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
// 18.3 获取当前的位置
// currentPos == 0
int currentPos = this.wrotePosition.get();
// 0 < 10485760
if (currentPos < this.fileSize) {
// writeBuffer == null
// mappedByteBuffer.slice()返回原ByteBuffer的一个镜像,所有改变互相可见.position和limit独立.
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
// 设置position = 0
byteBuffer.position(currentPos);
// 定义结果集
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) { // 内联消息
// *****************追加消息*******************************
// 18.4 追回消息
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) { // 批量消息
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// *******************重点************************
// 20 记录消息上次写的位置,下次从这个位置开始追加消息
// DefaultMessageStore.FlushConsumeQueueService线程每隔一秒
// 实现对consumeQueue/Index进行刷盘.
this.wrotePosition.addAndGet(result.getWroteBytes());
// 获得写入的时间
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
} //end appendMessagesInner
}
(13). MessageExt
public class MessageExt extends Message {
// 18.5 获得IP地址写入到:byteBuffer
public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) {
return socketAddress2ByteBuffer(this.storeHost, byteBuffer);
}
// 18.5.1 获得IP地址写入到:byteBuffer
public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
// [10],[0],[6],[82]
byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
// int 4字节
// [8123]
byteBuffer.putInt(inetSocketAddress.getPort());
byteBuffer.flip();
return byteBuffer;
}
}
(14). MessageDecoder
public class MessageDecoder {
//
public static String createMessageId(
//
final ByteBuffer input,
final ByteBuffer addr,
final long offset) {
input.flip();input.limit(MessageDecoder.MSG_ID_LENGTH);
// [10, 0, 6, 82, 0, 0, 31, -69]
input.put(addr);
// long 8字节
// offset = 0
// [10, 0, 6, 82, 0, 0, 31, -69, 0, 0, 0, 0, 0, 0, 0, 0]
input.putLong(offset);
// 0A00065200001FBB0000000000000000
return UtilAll.bytes2string(input.array());
}
}
(15). MappedFile 写文件,同时记录:wrotePosition = 上次写的位置
public class MappedFile extends ReferenceResource {
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
// ......
// *****************************重点****************
this.wrotePosition.addAndGet(result.getWroteBytes());
// *****************************重点****************
// ......
}
// ......
}
}
(16). 定时任务:DefaultMessageStore.ReputMessageService 每隔一秒钟刷盘(写consumeQueue/Index).
public class DefaultMessageStore implements MessageStore {
// ReputMessageService 负责对(ConsumeQueue/Index)进行刷盘
class ReputMessageService extends ServiceThread {
public void run() {
while (!this.isStopped()) {
try {
// 休眠一秒再进行业务处理
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
// ...
}
}
} //end run
// 判断是否提交日志
private boolean isCommitLogAvailable() {
return
this.reputFromOffset <
// this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
DefaultMessageStore.this.commitLog.getMaxOffset();
}
private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
// ...
}
} //end doReput
}
}
(17). RoketMQ消息协议( QUEUEOFFSET:为每增加一个条消息,都会进行自增 )
TOTALSIZE = 整个消息的长度(用于界定消息的边界)[4字节]
MAGICCODE = 魔数[4字节]
BODYCRC = 消息体crc32校验码[4字节]
QUEUEID = 消息消费队列ID[4字节]
FLAG = RocketMQ不做处理,预留给应用程序使用[4字节]
QUEUEOFFSET = 消息在消息队列的偏移量(consumeQueue)[8字节]
PHYSICALOFFSET = 消息在CommitLog的偏移量[8字节]
SYSFLAG = 消息系统Flag(是否压缩/是否支持事务)[4字节]
BORNTIMESTAMP = 生产者调用消息发送API的时间[8字节]
BORNHOST = 生产者IP和端口
STORETIMESTAMP = 消息存储时间[8字节]
STOREHOSTADDRESS = Brokder服务器的IP+端口[8字节]
RECONSUMETIMES = 消息重试次数[4字节]
Prepared Transaction Offset = 事务消息物理偏移量[8字节]
BODY = 消息体内容的长度[4字节]
TOPIC = 消息主题的长度[1字节]
propertiesLength = 消息属性长度[2字节],消息属性长度不能超过65536个字符
(18). ConsumeQueue 数据结构
offset = 消息在CommitLog的偏移量
size = 消息的字节数
tagsCode = 消息的标记(*)