(1). 概述

前面,通过运行脚本,测试运行了6个Bookies,在这里,我们开始剖析源码,让自己更深入的了解:Bookies是如何构建出来的.

(2). 配置远程断点

# 为了支持远程断点,需要修改运行脚本内容:  /Users/lixin/GitRepository/bookkeeper/bin/bookkeeper

elif [ ${COMMAND} == "localbookie" ]; then
  NUMBER=$1
  shift
  # 增加断点,通过IDEA进行远程调试
  OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 ${OPTS}"
  # *******************************************************
  # 通过这一行,我们能看出来,localbookie调用的是:LocalBookKeeper类
  # *******************************************************
  exec "${JAVA}" ${OPTS} ${JMX_ARGS} -Dzookeeper.4lw.commands.whitelist='*' org.apache.bookkeeper.util.LocalBookKeeper ${NUMBER} ${BOOKIE_CONF} $@

(3). LocalBookKeeper.main

public static void main(String[] args) {
	// 要运行的Bookies数量和配置文件路径
	// args = {
	// 	6,
	//  /Users/lixin/GitRepository/bookkeeper/conf/bk_server.conf
	// }
	try { 
		//  没有参数的情况下退出
		if (args.length < 1) {
			usage();
			System.exit(-1);
		}

        // 解析出第一个参数:6
		int numBookies = 0;
		try {
			numBookies = Integer.parseInt(args[0]);
		} catch (NumberFormatException nfe) {
			LOG.error("Unrecognized number-of-bookies: {}", args[0]);
			usage();
			System.exit(-1);
		}
        
		// ********************************************************************
		// 配置文件转化成业务模型:ServerConfiguration
		// ********************************************************************
		ServerConfiguration conf = new ServerConfiguration();
		conf.setAllowLoopback(true);
		if (args.length >= 2) {
			// /Users/lixin/GitRepository/bookkeeper/conf/bk_server.conf
			String confFile = args[1];
			try {
				// ********************************************************************
				// 解析出配置文件到业务模型:ServerConfiguration
				// ********************************************************************
				conf.loadConf(new File(confFile).toURI().toURL());
				LOG.info("Using configuration file {}", confFile);
			} catch (Exception e) {
				// load conf failed
				LOG.warn("Error loading configuration file {}", confFile, e);
			}
		}
		
		// 第3个参数可以配置zk数据目录
		String zkDataDir = null;
		if (args.length >= 3) {
			zkDataDir = args[2];
		}
		
		// 默认的:BookiesConfig目录
		// defaultLocalBookiesConfigDir = /tmp/localbookies-config
		String localBookiesConfigDirName = defaultLocalBookiesConfigDir;
		if (args.length >= 4) {
			localBookiesConfigDirName = args[3];
		}

		// ******************************************************************************
		// 在单个jvm进程里启动6个Bookies
		// ******************************************************************************
		startLocalBookiesInternal(conf, zooKeeperDefaultHost, zooKeeperDefaultPort, numBookies, true,
				bookieDefaultInitialPort, false, "test", zkDataDir, localBookiesConfigDirName);
	} catch (Exception e) {
		LOG.error("Exiting LocalBookKeeper because of exception in main method", e);
		System.exit(-1);
	}
}

(4). LocalBookKeeper.startLocalBookiesInternal

static void startLocalBookiesInternal(
			//
			ServerConfiguration conf,
			// 127.0.0.1
			String zkHost,
			// 2181
			int zkPort,
			// 6
			int numBookies,
			// true
			boolean shouldStartZK,
			// 5000
			int initialBookiePort,
			// false
			boolean stopOnExit,
			// test
			String dirSuffix,
			// null
			String zkDataDir,
			// /tmp/localbookies-config
			String localBookiesConfigDirName)
            throws Exception {
	
	conf.setMetadataServiceUri(
			// 设置元数据保存的URL地址
			// zk+null://127.0.0.1:2181/ledgers
			newMetadataServiceUri(
					zkHost,
					zkPort,
					conf.getLedgerManagerLayoutStringFromFactoryClass(),
					conf.getZkLedgersRootPath()));
	
	// ****************************************************************************
	// 创建:LocalBookKeeper
	// ****************************************************************************
	LocalBookKeeper lb = new LocalBookKeeper(numBookies, initialBookiePort, conf, localBookiesConfigDirName);
	ZooKeeperServerShim zks = null;
	File zkTmpDir = null;
	List<File> bkTmpDirs = null;
	try {
		if (shouldStartZK) { // true
			File zkDataDirFile = null;
			// zk数据目录
			if (zkDataDir != null) { // false
				zkDataDirFile = new File(zkDataDir);
				if (zkDataDirFile.exists() && zkDataDirFile.isFile()) {
					throw new IOException("Unable to create zkDataDir, since there is a file at "
							+ zkDataDirFile.getAbsolutePath());
				}
				if (!zkDataDirFile.exists() && !zkDataDirFile.mkdirs()) {
					throw new IOException("Unable to create zkDataDir - " + zkDataDirFile.getAbsolutePath());
				}
			}
			
			// *****************************************************************************
			// 创建ZK数据的临时目录(/var/folders/l2/v7kxnww15mjb9sps4yb25sqh0000gn/T/zookeeper1221935550339855498test)
			// *****************************************************************************
			zkTmpDir = IOUtils.createTempDir("zookeeper", dirSuffix, zkDataDirFile);
			// 如果文件目录存在,则删除.
			zkTmpDir.deleteOnExit();
			
			// ********************************************************************
			// 值得学习:内部实际是构建出来了一个zookeeper.
			// ********************************************************************
			zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkTmpDir);
		}
		
		// 此时ZK数据目录内容如下:
		// lixin-macbook:zookeeper1221935550339855498test lixin$ tree /var/folders/l2/v7kxnww15mjb9sps4yb25sqh0000gn/T/zookeeper1221935550339855498test
		// /var/folders/l2/v7kxnww15mjb9sps4yb25sqh0000gn/T/zookeeper1221935550339855498test
		// └── version-2
		//    ├── log.1
		//    └── snapshot.0
		
		// 调用:LocalBookKeeper.initializeZookeeper方法,对ZK进行Node的初始化(会在ZK中创建以下三个Node)
		// /ledgers
		// /ledgers/available/
		// /ledgers/available/readonly
		lb.initializeZookeeper(zkHost, zkPort);
		
		// ***************************************************************
		// 开始运行:Bookies
		// ***************************************************************
		bkTmpDirs = lb.runBookies(dirSuffix);

		try {
			while (true) {
				Thread.sleep(5000);
			}
		} catch (InterruptedException ie) {
			Thread.currentThread().interrupt();
			if (stopOnExit) {
				lb.shutdownBookies();

				if (null != zks) {
					zks.stop();
				}
			}
			throw ie;
		}
	} catch (Exception e) {
		LOG.error("Failed to run {} bookies : zk ensemble = '{}:{}'",
				numBookies, zkHost, zkPort, e);
		throw e;
	} finally {
		if (stopOnExit) {
			if (null != bkTmpDirs) {
				cleanupDirectories(bkTmpDirs);
			}
			if (null != zkTmpDir) {
				FileUtils.deleteDirectory(zkTmpDir);
			}
		}
	} // end finally
}

(5). LocalBookKeeper.runBookies

private List<File> runBookies(String dirSuffix)
            throws Exception {
	List<File> tempDirs = new ArrayList<>();
	try {
		// dirSuffix = "test"
		// **********************************************************
		// 委托给了另一个重载的方法:runBookies
		// **********************************************************
		runBookies(tempDirs, dirSuffix);
		return tempDirs;
	} catch (Exception ioe) {
		cleanupDirectories(tempDirs);
		throw ioe;
	}
} // end runBookies

(6). LocalBookKeeper.runBookies

private void runBookies(List<File> tempDirs, String dirSuffix)
            throws Exception {
	LOG.info("Starting Bookie(s)");
	// Create Bookie Servers (B1, B2, B3)
	
	// 定义存放WAL日志的数组
	journalDirs = new File[numberOfBookies];
	// 定义BookieServer数组
	bs = new BookieServer[numberOfBookies];
	// 定义BookieServer相应的配置(ServerConfiguration)数组
	bsConfs = new ServerConfiguration[numberOfBookies];
	
	// localBookiesConfigDir = /tmp/localbookies-config,存放的是每个:BookieServer对应的配置文件.
	// 验证目录(/tmp/localbookies-config)
	if (localBookiesConfigDir.exists() && localBookiesConfigDir.isFile()) {
		throw new IOException("Unable to create LocalBookiesConfigDir, since there is a file at "
				+ localBookiesConfigDir.getAbsolutePath());
	}
	
	// 创建目录(/tmp/localbookies-config)
	if (!localBookiesConfigDir.exists() && !localBookiesConfigDir.mkdirs()) {
		throw new IOException(
				"Unable to create LocalBookiesConfigDir - " + localBookiesConfigDir.getAbsolutePath());
	}

	
	for (int i = 0; i < numberOfBookies; i++) {
		
		// 如果没有指定WAL目录,则创建一个临时的
		if (null == baseConf.getJournalDirNameWithoutDefault()) { // true
			//  /var/folders/l2/v7kxnww15mjb9sps4yb25sqh0000gn/T/localbookkeeper03300424156333625743test
			journalDirs[i] = IOUtils.createTempDir("localbookkeeper" + Integer.toString(i), dirSuffix);
			tempDirs.add(journalDirs[i]);
		} else {
			journalDirs[i] = new File(baseConf.getJournalDirName(), "bookie" + Integer.toString(i));
		}
		
		// 检查下新创建的:WAL目录是否存在
		if (journalDirs[i].exists()) { // true
			// 判断是否为目录
			if (journalDirs[i].isDirectory()) { // true
				// 重新删除目录
				FileUtils.deleteDirectory(journalDirs[i]);
			} else if (!journalDirs[i].delete()) {
				throw new IOException("Couldn't cleanup bookie journal dir " + journalDirs[i]);
			}
		}
		
		// 重新创建目录,如果创建不成功的话,就抛出异常,这样做的目的是要代码当前进程对磁盘有读写的权限.
		if (!journalDirs[i].mkdirs()) {
			throw new IOException("Couldn't create bookie journal dir " + journalDirs[i]);
		}

		// 数据目录
		// ledgerDirs = /tmp/bk-data
		String [] ledgerDirs = baseConf.getLedgerDirWithoutDefault();
		
		if ((null == ledgerDirs) || (0 == ledgerDirs.length)) { // false
			ledgerDirs = new String[] { journalDirs[i].getPath() };
		} else {
			// 循环遍历,创建目录
			for (int l = 0; l < ledgerDirs.length; l++) {
				// /tmp/bk-data/bookie0
				// /tmp/bk-data/bookie1
				// /tmp/bk-data/bookie2
				// /tmp/bk-data/bookie3
				// /tmp/bk-data/bookie4
				// /tmp/bk-data/bookie5
				File dir = new File(ledgerDirs[l], "bookie" + Integer.toString(i));
				
				if (dir.exists()) {
					if (dir.isDirectory()) {
						FileUtils.deleteDirectory(dir);
					} else if (!dir.delete()) {
						throw new IOException("Couldn't cleanup bookie ledger dir " + dir);
					}
				}
				if (!dir.mkdirs()) {
					throw new IOException("Couldn't create bookie ledger dir " + dir);
				}
				ledgerDirs[l] = dir.getPath();
			}
		}

		// clone一份配置文件.
		bsConfs[i] = new ServerConfiguration((ServerConfiguration) baseConf.clone());

		// initialPort = 5000
		PortManager.initPort(initialPort);
		if (0 == initialPort) { // false
			bsConfs[i].setBookiePort(0);
		} else { //true
			// 通过nextFreePort产生一个随机可用的端口,并设置到配置对象(ServerConfiguration)里.
			bsConfs[i].setBookiePort(PortManager.nextFreePort());
		}
		
		
		if (null == baseConf.getMetadataServiceUriUnchecked()) { // false
			bsConfs[i].setMetadataServiceUri(baseConf.getMetadataServiceUri());
		}

		// journalDirs[i].getPath() = /var/folders/l2/v7kxnww15mjb9sps4yb25sqh0000gn/T/localbookkeeper03300424156333625743test
		bsConfs[i].setJournalDirName(journalDirs[i].getPath());
		// ledgerDirs = /tmp/bk-data/bookieN
		bsConfs[i].setLedgerDirNames(ledgerDirs);
		
		// write config into file before start so we can know what's wrong if start failed
		// fileName = 172.17.14.140:5000.conf
		String fileName = BookieImpl.getBookieId(bsConfs[i]).toString() + ".conf";
		// 把配置内容序列化到本地磁盘上(/tmp/localbookies-config/172.17.14.140:5000.conf)
		serializeLocalBookieConfig(bsConfs[i], fileName);
		
		// 典型的发布与订阅模型
		// Mimic BookKeeper Main
		final ComponentInfoPublisher componentInfoPublisher = new ComponentInfoPublisher();
		final Supplier<BookieServiceInfo> bookieServiceInfoProvider = () -> buildBookieServiceInfo(componentInfoPublisher);
		
		// ******************************************************************************
		// 看了半天,好像这才是重点,创建:BookieService对象
		// ******************************************************************************
		BookieService bookieService = new BookieService(new BookieConfiguration(bsConfs[i]),
				NullStatsLogger.INSTANCE,
				bookieServiceInfoProvider
		);
		
		bs[i] = bookieService.getServer();
		// 发布事件
		bookieService.publishInfo(componentInfoPublisher);
		componentInfoPublisher.startupFinished();
		// 启动:BookieService
		bookieService.start();
	}

	// 创建:/tmp/localbookies-config/baseconf.conf,存放基础配置
	// /tmp/localbookies-config
	// lixin-macbook:localbookies-config lixin$ cat baseconf.conf
	// allowLoopback=true
	// bookiePort=3181
	// extraServerComponents=
	// httpServerEnabled=false
	// httpServerPort=8080
	// httpServerClass=org.apache.bookkeeper.http.vertx.VertxHttpServer
	// journalDirectories=/tmp/bk-txn
	// ledgerDirectories=/tmp/bk-data
	// zkServers=localhost:2181
	// zkTimeout=10000
	// zkEnableSecurity=false
	// storageserver.grpc.port=4181
	// dlog.bkcEnsembleSize=3
	// dlog.bkcWriteQuorumSize=2
	// dlog.bkcAckQuorumSize=2
	// storage.range.store.dirs=data/bookkeeper/ranges
	// storage.serve.readonly.tables=false
	// storage.cluster.controller.schedule.interval.ms=30000
	// metadataServiceUri=zk+null://127.0.0.1:2181/ledgers
	ServerConfiguration baseConfWithCorrectZKServers = new ServerConfiguration(
			(ServerConfiguration) baseConf.clone());
	if (null == baseConf.getMetadataServiceUriUnchecked()) {
		baseConfWithCorrectZKServers.setMetadataServiceUri(baseConf.getMetadataServiceUri());
	}
	serializeLocalBookieConfig(baseConfWithCorrectZKServers, "baseconf.conf");
}

(7). 总结

本来是想找到调用Bookeeper操作日志的API来着的,结果,看完源码后,才发现:LocalBookKeeper的目的是构建N个Bookies(BookieService)容器,并启动.底层用到了:VertxHttpServer.