(1). 概述
在这一小节,部析Calcite是如何通过SQL检索CSV的过程.
(2). CsvTest
package help.lixin.calcite;
import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;
import org.apache.calcite.adapter.csv.CsvSchema;
import org.apache.calcite.adapter.csv.CsvTable;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import org.junit.Test;
public class CsvTest {
@Test
public void testQuery() throws Exception {
// 0.获取csv文件的路径,注意获取到文件所在上层路径就可以了
String path = CsvTest.class.getClassLoader().getResource("sales").getPath();
// 1.构建CsvSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等
CsvSchema csvSchema = new CsvSchema(new File(path), CsvTable.Flavor.FILTERABLE);
// 2.构建Connection
// 2.1 设置连接参数
Properties info = new Properties();
// 不区分sql大小写
info.setProperty("caseSensitive", "false");
// 2.2 获取标准的JDBC Connection
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
// 2.3 获取Calcite封装的Connection
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下
// 以实现查询不同数据源的目的
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// 4.将不同数据源schema挂载到RootSchema,这里添加CsvSchema
rootSchema.add("csv", csvSchema);
// 5.执行SQL查询,通过SQL方式访问csv文件
String sql = "select * from csv.depts where name = 'Marketing' OR deptno = 30";
Statement statement = calciteConnection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
// 6.遍历打印查询结果集
System.out.println(ResultSetUtil.resultString(resultSet));
}
}
(3). sales/DEPTS.csv
DEPTNO:int,NAME:string
10,"Sales"
20,"Marketing"
30,"Accounts"
(4). 源码入口在哪?
我们知道Calcite是通过SQL进行检索CSV文件的,同时,它抽象出来了两个概念:Schema/Table,所以,我们要找到Schema入口.就是上面代码类:CsvSchema
// 1. Schema
public interface Schema {
// ....
}
// 2. AbstractSchema对Schema进行了实现
public class AbstractSchema implements Schema {
// ...
// 获得所有的Table信息.
protected Map<String, Table> getTableMap() {
return ImmutableMap.of();
}
}
// 3. CsvSchema继承了:AbstractSchema,并且,重写:getTableMap
public class CsvSchema extends AbstractSchema {
// 4. csv所在的目录,无须定位具体某个csv文件
private final File directoryFile;
// 5. SCANNABLE, FILTERABLE, TRANSLATABLE
// 全表 / 过滤 / RelNode
private final CsvTable.Flavor flavor;
// 6. 所有表的元数据信息
private Map<String, Table> tableMap;
// ... ...
protected Map<String, Table> getTableMap() {
if (tableMap == null) {
// 7. 扫描目录,并全创建
tableMap = createTableMap();
}
return tableMap;
}// end getTableMap
private Map<String, Table> createTableMap() {
final Source baseSource = Sources.of(directoryFile);
// 8. 过滤目录下:.gz/.csv/.json的文件
File[] files = directoryFile.listFiles((dir, name) -> {
final String nameSansGz = trim(name, ".gz");
return nameSansGz.endsWith(".csv")
|| nameSansGz.endsWith(".json");
});
// 9. 目录下不存在这些文件,则什么都不做
if (files == null) {
System.out.println("directory " + directoryFile + " not found");
files = new File[0];
}
// Build a map from table name to table; each file becomes a table.
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
// 10. 遍历所有的文件
for (File file : files) {
Source source = Sources.of(file);
// 11. 针对*.gz处理
Source sourceSansGz = source.trim(".gz");
// 12. 针对.json处理
final Source sourceSansJson = sourceSansGz.trimOrNull(".json");
if (sourceSansJson != null) { // 针对*.json处理
final Table table = new JsonScannableTable(source);
builder.put(sourceSansJson.relative(baseSource).path(), table);
}
final Source sourceSansCsv = sourceSansGz.trimOrNull(".csv");
if (sourceSansCsv != null) { // 针对*.csv处理
// ******************************************************************
// 13. 根据*.csv创建Table
// ******************************************************************
final Table table = createTable(source);
builder.put(sourceSansCsv.relative(baseSource).path(), table);
}
}
return builder.build();
}// end createTableMap
private Table createTable(Source source) {
switch (flavor) {
case TRANSLATABLE:
return new CsvTranslatableTable(source, null);
case SCANNABLE:
return new CsvScannableTable(source, null);
case FILTERABLE:
// ********************************************************************
// 带有谓词下推的处理.
// ********************************************************************
return new CsvFilterableTable(source, null);
default:
throw new AssertionError("Unknown flavor " + this.flavor);
}
} // end createTable
}
(5). CsvFilterableTable
public class CsvFilterableTable
// 1. 这是具体的某一张"表"了
extends CsvTable
// 2. 带有条件的表过滤
implements FilterableTable {
public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters) {
// 3. 读取csv文件的第一行,获得每一列的信息(列名称,数据类型)
final List<CsvFieldType> fieldTypes = getFieldTypes(root.getTypeFactory());
final String[] filterValues = new String[fieldTypes.size()];
// *****************************************************************************
// 4. 如果查询是带有条件的(select * from csv.depts where name = 'Marketing')
// filterValues = [null,"Marketing"],实际是代表要过滤的条件
// 注意:filterValues是数组形式,name在csv中的第一列,所以,过滤条件内容是在数组的第一列,而第0列为空.
// 通过谓词下推,让数据源在读取数据时,就减少数据的返回.这样在"投影"时,就能更多的节约内存.
// 而不是所有数据全部加载到内存,再通过内存去减少数据.
// *****************************************************************************
filters.removeIf(filter -> addFilter(filter, filterValues));
final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
// *********************************************************************
// 5. 通过CsvEnumerator实现csv的读取,注意,返回的是:Enumerator<Object[]>
// *********************************************************************
return new AbstractEnumerable<Object[]>() {
public Enumerator<Object[]> enumerator() {
return new CsvEnumerator<>(source, cancelFlag, false, filterValues,
CsvEnumerator.arrayConverter(fieldTypes, fields, false));
}
};
}
private boolean addFilter(RexNode filter, Object[] filterValues) {
if (filter.isA(SqlKind.AND)) {
// We cannot refine(remove) the operands of AND,
// it will cause o.a.c.i.TableScanNode.createFilterable filters check failed.
((RexCall) filter).getOperands().forEach(subFilter -> addFilter(subFilter, filterValues));
} else if (filter.isA(SqlKind.EQUALS)) {
final RexCall call = (RexCall) filter;
RexNode left = call.getOperands().get(0);
if (left.isA(SqlKind.CAST)) {
left = ((RexCall) left).operands.get(0);
}
final RexNode right = call.getOperands().get(1);
if (left instanceof RexInputRef
&& right instanceof RexLiteral) {
final int index = ((RexInputRef) left).getIndex();
if (filterValues[index] == null) {
filterValues[index] = ((RexLiteral) right).getValue2().toString();
return true;
} // end if
} // end if
} // end else if
return false;
} // end addFilter
}
(6). CsvEnumerator
public class CsvEnumerator<E> implements Enumerator<E> {
public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,
String[] filterValues, RowConverter<E> rowConverter) {
this.cancelFlag = cancelFlag;
// 1. 数据行转换器
this.rowConverter = rowConverter;
// 2. 过滤条件
this.filterValues = filterValues == null ? null
: ImmutableNullableList.copyOf(filterValues);
try {
// 是否流式处理
if (stream) { // false
this.reader = new CsvStreamReader(source);
} else { // 3. 打开csv文件
this.reader = openCsv(source);
}
// 4. 跳过第一行
this.reader.readNext(); // skip header row
} catch (IOException e) {
throw new RuntimeException(e);
}
} // end
// *******************************************************************
// 5. while(resultSet.next()){ // ... }
// 当我们通过ResultSet去检索数据时,实则,是在读取csv文件,而且还是一行一行的读.
// *******************************************************************
public boolean moveNext() {
try {
outer:
for (;;) {
if (cancelFlag.get()) {
return false;
}
// 5.1 通过csv读取一行数据
final String[] strings = reader.readNext();
// 5.2 读取的数据不存在的情况下
if (strings == null) {
// 5.3 判断是否为流式读取
if (reader instanceof CsvStreamReader) {
try {
// 5.4 休眠2秒针
Thread.sleep(CsvStreamReader.DEFAULT_MONITOR_DELAY);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 5.5 跳到:for循环重新开始读取
continue;
}
// 5.6 如果不是流式读取,则代表读取到EOF了
current = null;
// 关闭流
reader.close();
// 返回false
return false;
} // end 读取EOF或Stream的情况下
if (filterValues != null) { // 5.7 需要在读取时就过滤的数据
// 遍历读取的CSV一行数据(10,"Sales")
for (int i = 0; i < strings.length; i++) {
// 判断是否为要过滤的列
// *********************************************************************
// 这也是 WHERE deptno = 1 OR name = 'xxx'不走索引的原理
// *********************************************************************
// filterValues=[null,"Marketing"]
// DEPTNO:int,NAME:string
String filterValue = filterValues.get(i);
if (filterValue != null) {
// 如果读取的数据列(strings[i])与filterValue不相等,则跳过数据解析阶段,继续for循环.
if (!filterValue.equals(strings[i])) {
continue outer;
}
}
}
} // 根据谓词下推,在读取数据时,就对数据进行过滤
// 5.8 通过RowConverter对读取的数据行进行解析.
current = rowConverter.convertRow(strings);
return true;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
} // end moveNext
}
(7). 总结
总体来说,对CSV的解析有了一个大体的入门,也可以参数上面的方式,自定义Schema和Table.
谓词下推确实是能有效解决数据的返回,但是,对IO的操作呢?实际仍然是一行一行的读取,所以,这也是为什么数据库里要有索引的存在的原因了. 对Calcite的使用:还是要尽可能的Hold住场景,它比较适合那些:索引在ES,数据在HBase.因为,这些数据源充当了索引.又或者说它适合于OLAP的场景,非实时的场景.