(1). 概述
在这一小节,部析Calcite是如何通过SQL检索ES的过程.
(2). ElasticsearchSchema
package org.apache.calcite.adapter.elasticsearch;
public class ElasticsearchSchema extends AbstractSchema {
private final RestClient client;
private final ObjectMapper mapper;
private final Map<String, Table> tableMap;
// Query最大返回数量
private final int fetchSize;
// 1. 构造器
public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String index) {
this(client, mapper, index, ElasticsearchTransport.DEFAULT_FETCH_SIZE);
} // end
@VisibleForTesting
ElasticsearchSchema(
RestClient client,
ObjectMapper mapper,
String index,
int fetchSize) {
super();
this.client = Objects.requireNonNull(client, "client");
this.mapper = Objects.requireNonNull(mapper, "mapper");
Preconditions.checkArgument(fetchSize > 0, "invalid fetch size. Expected %s > 0", fetchSize);
this.fetchSize = fetchSize;
if (index == null) {
try {
// *************************************************************
// 2. 加载ES中所有的索引库
// *************************************************************
this.tableMap = createTables(indicesFromElastic());
} catch (IOException e) {
throw new UncheckedIOException("Couldn't get indices", e);
}
} else {
// 加载具体某一个索引库
this.tableMap = createTables(Collections.singleton(index));
}
} // end
// *********************************************************************
// 3.向ES服务器,发送GET请求,获得所有的索引库名称.
// *********************************************************************
private Set<String> indicesFromElastic() throws IOException {
final String endpoint = "/_alias";
final Response response = client.performRequest(new Request("GET", endpoint));
try (InputStream is = response.getEntity().getContent()) {
final JsonNode root = mapper.readTree(is);
if (!(root.isObject() && root.size() > 0)) {
final String message = String.format(Locale.ROOT, "Invalid response for %s/%s "
+ "Expected object of at least size 1 got %s (of size %d)", response.getHost(),
response.getRequestLine(), root.getNodeType(), root.size());
throw new IllegalStateException(message);
}
// 获得所有索引库的名称
Set<String> indices = Sets.newHashSet(root.fieldNames());
return indices;
}
}// end indicesFromElastic
// *******************************************************************
// 4. 创建所有的表
// *******************************************************************
private Map<String, Table> createTables(Iterable<String> indices) {
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
for (String index : indices) {
// 4.1 为每一个索引库创建一个:ElasticsearchTransport,专门负责与ES通信的
final ElasticsearchTransport transport = new ElasticsearchTransport(client, mapper, index, fetchSize);
// 4.2 创建ES Table,并把请求委托给:ElasticsearchTransport.
// 4.3 ElasticsearchTransport主要是实现了对:_search的封装.
builder.put(index, new ElasticsearchTable(transport));
}
return builder.build();
} // end
}
(3). ElasticsearchTransport
final class ElasticsearchTransport {
static final int DEFAULT_FETCH_SIZE = 5196;
private final ObjectMapper mapper;
// es提供的client
private final RestClient restClient;
final String indexName;
// 调用ES接口(http://localhost:9200/),获得es版本信息
final ElasticsearchVersion version;
// 调用ES接口(http://localhost:9200/${index}/_mapping),获得索引库的mapping信息.
final ElasticsearchMapping mapping;
// fetch大小
final int fetchSize;
// 1. 构造器
ElasticsearchTransport(final RestClient restClient,
final ObjectMapper mapper,
final String indexName,
final int fetchSize) {
this.mapper = Objects.requireNonNull(mapper, "mapper");
this.restClient = Objects.requireNonNull(restClient, "restClient");
this.indexName = Objects.requireNonNull(indexName, "indexName");
this.fetchSize = fetchSize;
// 调用ES接口(http://localhost:9200/),获得es版本信息
this.version = version(); // cache version
// 调用ES接口(http://localhost:9200/${index}/_mapping),获得索引库的mapping信息.
this.mapping = fetchAndCreateMapping(); // cache mapping
} // end
// 2. 检索
Function<ObjectNode, ElasticsearchJson.Result> search() {
return search(Collections.emptyMap());
} // end search
// 3. 带条件的检索
Function<ObjectNode, ElasticsearchJson.Result> search(final Map<String, String> httpParams) {
Objects.requireNonNull(httpParams, "httpParams");
return query -> {
// 钩子回调(你可以实现:java.util.function.Consumer,并调用Hook.addThread)
Hook.QUERY_PLAN.run(query);
// 请求ES的URL
// indexName = "books"
String path = String.format(Locale.ROOT, "/%s/_search", indexName);
final HttpPost post;
try {
URIBuilder builder = new URIBuilder(path);
httpParams.forEach(builder::addParameter);
post = new HttpPost(builder.build());
// 把所有的请求参数,转换成:json字符串
// SELECT _MAP['id'],_MAP['title'],_MAP['price'] FROM es.books WHERE _MAP['price'] > 60 LIMIT 2
// {"query":{"constant_score":{"filter":{"range":{"price":{"gt":60}}}}},"_source":["id","title","price"],"size":2}
// SELECT * FROM es.books WHERE _MAP['price'] > 10 offset 0 fetch next 10 rows only
// {"query":{"constant_score":{"filter":{"range":{"price":{"gt":10}}}}},"from":0,"size":10}
// 统计SQL
// SELECT count(*) FROM es.books WHERE _MAP['price'] > 50
// {"query":{"constant_score":{"filter":{"range":{"price":{"gt":50}}}}},"_source":false,"size":0,"stored_fields":"_none_","track_total_hits":true}
final String json = mapper.writeValueAsString(query);
LOGGER.debug("Elasticsearch Query: {}", json);
post.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
} catch (JsonProcessingException e) {
throw new UncheckedIOException(e);
}
// 发送HTTP请求
return rawHttp(ElasticsearchJson.Result.class).apply(post);
};
} // end search
}
(4). 总结
Calcite通过对SQL的解析,转换成HTTP请求,但是,对ES Client的设计,感觉不咋的.
ElasticsearchTable类在这里不进行深入,后面还要讲:TranslatableTable/ScannableTable/FilterableTable的区别,到时回来再深入.