(1). 概述

上一篇对SnapshotWriter的源码进行了剖析,在这篇,对SnapshotReader进行剖析(相比SnapshotWriter,SnapshotReader还是比较复杂一点)

(2). Snapshot UML图解

"Snapshot UML图解"

(3). LocalSnapshotReader构造器

public class LocalSnapshotReader extends SnapshotReader {

    private static final Logger          LOG = LoggerFactory.getLogger(LocalSnapshotReader.class);

    /** Generated reader id*/
    private long                         readerId;
    /** remote peer addr */
    private final Endpoint               addr;
    private final LocalSnapshotMetaTable metaTable;
    private final String                 path;
	// 依赖SnapshotStorage,暂时不理会它
    private final LocalSnapshotStorage   snapshotStorage;
	// 限流,暂时也不理它
    private final SnapshotThrottle       snapshotThrottle;

	
    public LocalSnapshotReader(LocalSnapshotStorage snapshotStorage, 
						       SnapshotThrottle snapshotThrottle, 
							   Endpoint addr,
                               RaftOptions raftOptions, 
							   String path) {
        super();
        this.snapshotStorage = snapshotStorage;
        this.snapshotThrottle = snapshotThrottle;
        this.addr = addr;
		// /var/folders/l2/v7kxnww15mjb9sps4yb25sqh0000gn/T/jraft_test_8977291920811/snapshot_99
        this.path = path;
        this.readerId = 0;
        this.metaTable = new LocalSnapshotMetaTable(raftOptions);
    } // end 
}	

(4). LocalSnapshotReader初始化(LocalSnapshotReader.init)

public boolean init(final Void v) {
	// **************************************************************************************
	// /var/folders/l2/v7kxnww15mjb9sps4yb25sqh0000gn/T/jraft_test_8977291920811/snapshot_99
	// **************************************************************************************
	final File dir = new File(this.path);
	if (!dir.exists()) {
		LOG.error("No such path {} for snapshot reader.", this.path);
		setError(RaftError.ENOENT, "No such path %s for snapshot reader", this.path);
		return false;
	}
	
	// **************************************************************************************
	// /var/folders/l2/v7kxnww15mjb9sps4yb25sqh0000gn/T/jraft_test_8977291920811/snapshot_99/__raft_snapshot_meta
	// **************************************************************************************
	final String metaPath = this.path + File.separator + JRAFT_SNAPSHOT_META_FILE;
	try {
		// **************************************************************************************
		// 读取磁盘上已经序列化了的文件
		// **************************************************************************************
		return this.metaTable.loadFromFile(metaPath);
	} catch (final IOException e) {
		LOG.error("Fail to load snapshot meta {}.", metaPath, e);
		setError(RaftError.EIO, "Fail to load snapshot meta from path %s", metaPath);
		return false;
	}
}

(5). LocalSnapshotReader生成可读的URL(LocalSnapshotReader.generateURIForCopy)

public String generateURIForCopy() {
	// 验证下Endpoint
	if (this.addr == null || this.addr.equals(new Endpoint(Utils.IP_ANY, 0))) {
		LOG.error("Address is not specified");
		return null;
	}
	
	if (this.readerId == 0) {
		final SnapshotFileReader reader = new SnapshotFileReader(this.path, this.snapshotThrottle);
		reader.setMetaTable(this.metaTable);
		
		// ********************************************************************
		// 委托给:SnapshotFileReader尝试打开一下文件
		// ********************************************************************
		if (!reader.open()) {
			LOG.error("Open snapshot {} failed.", this.path);
			return null;
		}
		
		// ********************************************************************
		// 通过:FileService生成一个读取文件的唯一id
		// ********************************************************************
		this.readerId = FileService.getInstance().addReader(reader);
		if (this.readerId < 0) {
			LOG.error("Fail to add reader to file_service.");
			return null;
		}
	}
	
	// 最终返回的是网络+读取id
	// remote://localhost:8081/57782045674793870
	return String.format(REMOTE_SNAPSHOT_URI_SCHEME + "%s/%d", this.addr, this.readerId);
} // end 

(6). FileService.addReader

// Long : readerId
// FileReader : 文件读取
private final ConcurrentMap<Long, FileReader> fileReaderMap = new ConcurrentHashMap<>();

// 自增数
private final AtomicLong                      nextId        = new AtomicLong();

public long addReader(final FileReader reader) {
	// 生成一个自增数
	final long readerId = this.nextId.getAndIncrement();
	// 通过map Hold住:FileReader
	if (this.fileReaderMap.putIfAbsent(readerId, reader) == null) {
		return readerId;
	} else {
		return -1L;
	}
}

(7). FileService.handleGetFile

// 前面分析仅仅只是一个生产可读的URL请求,在这里,是读取URL请求,进行处理来着的. 
public Message handleGetFile(final GetFileRequest request, final RpcRequestClosure done) {
	if (request.getCount() <= 0 || request.getOffset() < 0) {
		return RpcFactoryHelper //
			.responseFactory() //
			.newResponse(GetFileResponse.getDefaultInstance(), RaftError.EREQUEST, "Invalid request: %s", request);
	}
	
	// ******************************************************************************
	// 根据readerId来读取快照
	// ******************************************************************************
	final FileReader reader = this.fileReaderMap.get(request.getReaderId());
	if (reader == null) {
		return RpcFactoryHelper //
			.responseFactory() //
			.newResponse(GetFileResponse.getDefaultInstance(), RaftError.ENOENT, "Fail to find reader=%d",
				request.getReaderId());
	}

	if (LOG.isDebugEnabled()) {
		LOG.debug("GetFile from {} path={} filename={} offset={} count={}", done.getRpcCtx().getRemoteAddress(),
			reader.getPath(), request.getFilename(), request.getOffset(), request.getCount());
	}

	final ByteBufferCollector dataBuffer = ByteBufferCollector.allocate();
	final GetFileResponse.Builder responseBuilder = GetFileResponse.newBuilder();
	try {
		final int read = reader
			.readFile(dataBuffer, request.getFilename(), request.getOffset(), request.getCount());
		responseBuilder.setReadSize(read);
		responseBuilder.setEof(read == FileReader.EOF);
		final ByteBuffer buf = dataBuffer.getBuffer();
		BufferUtils.flip(buf);
		if (!buf.hasRemaining()) {
			// skip empty data
			responseBuilder.setData(ByteString.EMPTY);
		} else {
			// TODO check hole
			responseBuilder.setData(ZeroByteStringHelper.wrap(buf));
		}
		return responseBuilder.build();
	} catch (final RetryAgainException e) {
		return RpcFactoryHelper //
			.responseFactory() //
			.newResponse(GetFileResponse.getDefaultInstance(), RaftError.EAGAIN,
				"Fail to read from path=%s filename=%s with error: %s", reader.getPath(), request.getFilename(),
				e.getMessage());
	} catch (final IOException e) {
		LOG.error("Fail to read file path={} filename={}", reader.getPath(), request.getFilename(), e);
		return RpcFactoryHelper //
			.responseFactory() //
			.newResponse(GetFileResponse.getDefaultInstance(), RaftError.EIO,
				"Fail to read from path=%s filename=%s", reader.getPath(), request.getFilename());
	}
}

(8). 总结

从SnapshotReader接口签名上来看就是生成一个可读取的URL,而SnapshotReader它最主要的目的是:产生一个readId,然后,通过readId与FileReader进行关联,下次,发起请求时,根据这个readId找到:FileReader读取文件来着的.