文章随手写的,有需要的自己看看。
ES 相关概念
index:index 在 es 中不是指索引,其对应数据库中的 table 概念。一个 index 含有多个 document,一个 document 是一个 json。
mapping:针对于 index 来说的,指一个 index 的结构,对应数据库中的 table schema。
mapping type:一般也叫 type,在老版 ES 中,type 的默认值为 _doc,但是在 ES8 后移除了 type。但是 SR 有对 type 做出了兼容。
doc_values:ES 为了更加高效的支持 sorting 和 aggregations 操作,会对大部分字段被创建时自动创建对应的 doc_value(doc_values 采用列式存储)。然而 text 和 annotated_text 不支持创建对应的doc_values。
_source:可以理解为原始数据,即用户存进去是什么样就是什么样,不会被改动。比如数组打平问题,假设用户输入数组 [1, [2, 3]] ,在 _source 字段中,仍然以原样 [1, [2, 3] 存储,但是在 doc_values 中会被自动打平成 [1, 2, 3] 。
keyword :如果有些文本字段在 ES 中不希望被分词索引,可以使用 keyword 类型。
text 和 keyword 的区别:
text:
- 会分词,然后进行索引
- 支持模糊、精确查询
- 不支持聚合
- 不支持 doc_value
keyword:
- 不进行分词,直接索引
- 支持模糊、精确查询
- 支持聚合,keyword 字段允许被存储在 doc_values 中
更多参考:https://cloud.tencent.com/developer/article/1672344
Create Table 流程
收到 create table 语句后,在 LocalMetastore 中执行 createEsTable()
方法。
在 createEsTable()
中创建 EsTable 后,将其注册到 EsRepository 中。EsRepository 会在内存中负责维护 SR 中所有的 EsTable 和 EsRestClient。其中每一个 EsTable 都有一个对应的 EsRestClient。
public class EsRepository extends LeaderDaemon {
private Map<Long, EsTable> esTables;
private Map<Long, EsRestClient> esClients;
}
EsTable:顾名思义,保存 ES table 相关信息。
EsRestClient:用于发起 http 请求,通过 restful 协议和 ES 交互。
EsRepository 通过继承 LeaderDaemon 类,实现在 FE 启动后定时和 ES 集群同步 EsTable 的元信息。
为什么 HiveRepository 和 IcebergRepository 不这么做。
protected void runAfterCatalogReady() {
for (EsTable esTable : esTables.values()) {
try {
esTable.syncTableMetaData(esClients.get(esTable.getId()));
} catch (Throwable e) {
LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", esTable.getName(),
e);
esTable.setEsTablePartitions(null);
esTable.setLastMetaDataSyncException(e);
}
}
}
EsTable 元信息同步
EsRepository 会定时调用 EsTable 中的 syncTableMetaData(EsRestClient client)
方法同步 EsTable 和 ES 集群中的元信息。
元信息同步由 EsMetaStateTracker 负责,在同步前会创建一个 SearchContext,这个 context 会贯穿同步的三个阶段。拿到的信息都会放在 SearchContext 中。
VersionPhase:
通过 GET ``/
获得 ES 集群信息,其实就拿了个 version.number。
{
"name": "00ec8680adec",
"cluster_name": "docker-cluster",
"cluster_uuid": "nkNEidmtS_yocGW7wIHdlQ",
"version": {
"number": "8.3.2",
"build_type": "docker",
"build_hash": "8b0b1f23fbebecc3c88e4464319dea8989f374fd",
"build_date": "2022-07-06T15:15:15.901688194Z",
"build_snapshot": false,
"lucene_version": "9.2.0",
"minimum_wire_compatibility_version": "7.17.0",
"minimum_index_compatibility_version": "7.0.0"
},
"tagline": "You Know, for Search"
}
MappingPhase:
通过 GET {index}/_mapping
获取指定 index 的 mapping。
{
"index": {
"mappings": {
"properties": {
"age": {
"type": "long"
},
"array1": {
"type": "long"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
在 MappingPhase,使用如下代码解析获取的 json 数据。
for (Column col : searchContext.columns()) {
String colName = col.getName();
// if column exists in StarRocks Table but no found in ES's mapping, we choose to ignore this situation?
if (!properties.has(colName)) {
continue;
}
JSONObject fieldObject = properties.optJSONObject(colName);
resolveKeywordFields(searchContext, fieldObject, colName);
resolveDocValuesFields(searchContext, fieldObject, colName);
}
resolveKeywordFields() 会把 keyword field 放到 searchContext.resolveKeywordFields 中,resolveDocValuesFields() 会把 doc_values field 放到 searchContext.docValueFieldsContext 中。
PartitionPhase
通过 GET ``/{index}/_search_shards
获取 index 在 ES 中的分片。
BUG 来源地:后面我们 be 发送请求是根据 shard 中的 ip 地址,而不是添加外表时指定的地址。如果 ES 中 shard 地址和 SR 添加外表的地址不同,会导致 BE 无法正常请求 ES。这种情况通常发生于一台机器会有多个网卡的情况,然后用户又没有指定 ES 的 IP。
{
"nodes": {
"wuGFWnAjSg6xXidaxX7azQ": {
"name": "node-1",
"ephemeral_id": "5nXFzMFTRxai5PMN_fLYMQ",
"transport_address": "172.26.195.67:9200",
"attributes": {}
},
"x-uhMBJ9SuGxYBLMGpwwWw": {
"name": "node-2",
"ephemeral_id": "Y1Oct7tSRaC6L-F93Sw3tw",
"transport_address": "172.26.195.68:9200",
"attributes": {}
}
},
"indices": {
"danny": {}
},
"shards": [
[
{
"state": "STARTED",
"primary": true,
"node": "x-uhMBJ9SuGxYBLMGpwwWw",
"relocating_node": null,
"shard": 0,
"index": "danny",
"allocation_id": {
"id": "d-TAtX0hS4WxUVzy0S3HuA"
}
},
{
"state": "STARTED",
"primary": false,
"node": "wuGFWnAjSg6xXidaxX7azQ",
"relocating_node": null,
"shard": 0,
"index": "danny",
"allocation_id": {
"id": "-PEjbmMBQIOqbCEQcFidEw"
}
}
],
[
{
"state": "STARTED",
"primary": true,
"node": "wuGFWnAjSg6xXidaxX7azQ",
"relocating_node": null,
"shard": 1,
"index": "danny",
"allocation_id": {
"id": "dKDxXub4RcKXIzf59oc3vQ"
}
},
{
"state": "STARTED",
"primary": false,
"node": "x-uhMBJ9SuGxYBLMGpwwWw",
"relocating_node": null,
"shard": 1,
"index": "danny",
"allocation_id": {
"id": "Ew-HGVb7RfW8kZPy4nBbiA"
}
}
],
[
{
"state": "STARTED",
"primary": true,
"node": "x-uhMBJ9SuGxYBLMGpwwWw",
"relocating_node": null,
"shard": 2,
"index": "danny",
"allocation_id": {
"id": "cy-71kM_RLGDPJyeVwzLJw"
}
},
{
"state": "STARTED",
"primary": false,
"node": "wuGFWnAjSg6xXidaxX7azQ",
"relocating_node": null,
"shard": 2,
"index": "danny",
"allocation_id": {
"id": "ClwYb-LxQ7e319c-8yI9eg"
}
}
],
[
{
"state": "STARTED",
"primary": false,
"node": "x-uhMBJ9SuGxYBLMGpwwWw",
"relocating_node": null,
"shard": 3,
"index": "danny",
"allocation_id": {
"id": "bWh7KAPITcW10_1uKYPPEA"
}
},
{
"state": "STARTED",
"primary": true,
"node": "wuGFWnAjSg6xXidaxX7azQ",
"relocating_node": null,
"shard": 3,
"index": "danny",
"allocation_id": {
"id": "p-Lw1PFkQdy5CgUfLdhPZA"
}
}
],
[
{
"state": "STARTED",
"primary": true,
"node": "x-uhMBJ9SuGxYBLMGpwwWw",
"relocating_node": null,
"shard": 4,
"index": "danny",
"allocation_id": {
"id": "qva4hTx6QU2LR19453jazA"
}
},
{
"state": "STARTED",
"primary": false,
"node": "wuGFWnAjSg6xXidaxX7azQ",
"relocating_node": null,
"shard": 4,
"index": "danny",
"allocation_id": {
"id": "125HJtYCS62qxeve_onPDA"
}
}
]
]
}
一个 index 会有多个 shard,一个 shard 又有多个副本,然后这些 shard 会分布在多个 node 上。SR 以 EsShardPartitions 保存一个 index 所有的 shard 信息。
public class EsShardPartitions {
private final String indexName;
// shardid -> host1, host2, host3
private Map<Integer, List<EsShardRouting>> shardRoutings;
}
再通过 GET /_nodes/http
获取节点地址信息,并将地址信息写入上面的 EsShardPartitions 中。
{
"_nodes": {
"total": 2,
"successful": 2,
"failed": 0
},
"cluster_name": "wyf",
"nodes": {
"wuGFWnAjSg6xXidaxX7azQ": {
"name": "node-1",
"transport_address": "172.26.195.67:9200",
"host": "172.26.195.67",
"ip": "172.26.195.67",
"version": "6.5.3",
"build_flavor": "default",
"build_type": "tar",
"build_hash": "159a78a",
"roles": [
"master",
"data",
"ingest"
],
"http": {
"bound_address": [
"172.26.195.67:8200"
],
"publish_address": "172.26.195.67:8200",
"max_content_length_in_bytes": 104857600
}
},
"x-uhMBJ9SuGxYBLMGpwwWw": {
"name": "node-2",
"transport_address": "172.26.195.68:9200",
"host": "172.26.195.68",
"ip": "172.26.195.68",
"version": "6.5.3",
"build_flavor": "default",
"build_type": "tar",
"build_hash": "159a78a",
"roles": [
"master",
"data",
"ingest"
],
"http": {
"bound_address": [
"172.26.195.68:8200"
],
"publish_address": "172.26.195.68:8200",
"max_content_length_in_bytes": 104857600
}
}
}
}
总的来说,PartitionPhase 就是获取一个 index 所有的 shard 信息以及该 shard 所在 node 的信息(IP 等等)。
发起查询
FE
EsScanNode 负责 fragment plan 的生成。
protected void toThrift(TPlanNode msg) {
if (EsTable.TRANSPORT_HTTP.equals(table.getTransport())) {
msg.node_type = TPlanNodeType.ES_HTTP_SCAN_NODE;
} else {
msg.node_type = TPlanNodeType.ES_SCAN_NODE;
}
Map<String, String> properties = Maps.newHashMap();
properties.put(EsTable.USER, table.getUserName());
properties.put(EsTable.PASSWORD, table.getPasswd());
properties.put(EsTable.ES_NET_SSL, String.valueOf(table.sslEnabled()));
TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt());
esScanNode.setProperties(properties);
if (table.isDocValueScanEnable()) {
esScanNode.setDocvalue_context(table.docValueContext());
properties.put(EsTable.DOC_VALUES_MODE, String.valueOf(useDocValueScan(desc, table.docValueContext())));
}
if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) {
esScanNode.setFields_context(table.fieldsContext());
}
msg.es_scan_node = esScanNode;
}
可以看到主要就是装填一些关于 ES 集群的信息,比如是否开启 SSL,ES 的鉴权用户名密码,巴拉巴拉。
主要关注 int
useDocValueScan()
方法,其判断是否让 be 使用 doc_value 进行扫描。如果 select 的字段中存在不支持 doc_value 的字段(比如 text),则降级为 _source 搜索,否则使用 doc_values 进行搜索。
其他重要方法:
List<TScanRangeLocations> computeShardLocations()
:计算每一个 shard 应该被分配到哪个 BE 上进行计算(争取被计算的 shard 所在节点和 BE 是同一个节点),类似于 hdfs 通过移动计算的方式减少网络开销。
getNodeExplainString()
:存粹输出 explain 信息用的。因为我们具体的 Query DSL 是在 BE 生成的,正常情况下 FE 是拿不到具体的 Query DSL 信息。但是为了生成 explain 信息,SR 实现了 QueryBuilder 和 QueryConverter 这两个类,模拟 BE 的行为生成了 Query DSL 信息,返回个用户。
Q:既然 FE 都已经生成了 Query DSL 信息,为什么 BE 还要再去生成一遍,直接发下去不就好了?
A:将来会改。
BE
ESDataSource 中的 open()
负责查询前的准备工作。
open()
open()
分别调用如下主要方法:
_build_conjuncts()
:将 ES 能够处理的 predicates 用 EsPredicate 包装起来,存放在 _predicates 字段中。_normalize_conjuncts()
:扫描_conjunct_ctxs
存放的 Expr 表达式,移除在_build_conjuncts()
中已经下推的 Expr。不然就重复应用 Expr 了。_create_scanner()
:创建 ESScanReader。- 在创建 ESScanReader 时,会调用
ESScrollQueryBuilder::build()
方法生成 Query DSL 语句。长下面这个样子: JSON { "query": { "bool": { "filter": [{ "bool": { "should": [{ "term": { "age": "1" } }] } }] } }, "stored_fields": "_none_", "docvalue_fields": [ "age", "array1" ], "sort": [ "_doc" ], "size": 4096 }
ESScanReader 创建过程中,里面主要涉及请求 url 的组装,鉴权用户名密码的组装。
注意如下方法:
if (props.find(KEY_TERMINATE_AFTER) != props.end()) {
_exactly_once = true;
// just send a normal search against the elasticsearch with additional terminate_after param to achieve terminate early effect when limit take effect
if (_type.empty()) {
_search_url = fmt::format("{}/{}/_search?terminate_after={}&preference=_shards:{}&{}", _target, _index,
props.at(KEY_TERMINATE_AFTER), _shards, filter_path);
} else {
_search_url = fmt::format("{}/{}/{}/_search?terminate_after={}&preference=_shards:{}&{}", _target, _index,
_type, props.at(KEY_TERMINATE_AFTER), _shards, filter_path);
}
} else {
_exactly_once = false;
// scroll request for scanning
// add terminate_after for the first scroll to avoid decompress all postings list
if (_type.empty()) {
_init_scroll_url = fmt::format("{}/{}/_search?scroll={}&preference=_shards:{}&{}", _target, _index,
_scroll_keep_alive, _shards, filter_path);
} else {
_init_scroll_url = fmt::format("{}/{}/{}/_search?scroll={}&preference=_shards:{}&{}", _target, _index,
_type, _scroll_keep_alive, _shards, filter_path);
}
_next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + filter_path;
}
可以看到 _exactly_once 变量。当上层向下发起的请求存在 limit 算子,且允许该 limit 算子下推到 ES 时,这里就会设置 _exactly_once 为 true。
只有当所有算子允许下推到 ES 时,limit 算子才允许跟着下推到 ES。
在 exactly_once=true
模式下,BE 会直接使用 _search 进行搜索,请求一次就行。如果 exactly_once=false
模式,BE 在执行搜索时会携带 scroll 参数,类似于一个游标,告诉 ES 我上次读取到哪里,然后你从上次读的地方继续返回(可以理解为分页查询,每次返回一点)。
数据量比较大时肯定采用 scroll 模式读取。
其后,ESDataSource 也会调用 ESScanReader的 open()
方法。因为在 SR 的架构设计中,open()
方法是做准备工作,不执行请求。然而相对于 ES 来说,只要你请求了 URL 就返回结果,不存在准备工作。所以可以看到如下代码:
Status ESScanReader::open() {
_is_first = true;
if (_exactly_once) {
RETURN_IF_ERROR(_network_client.init(_search_url));
} else {
RETURN_IF_ERROR(_network_client.init(_init_scroll_url));
}
// phase open, we cached the first response for `get_next` phase
Status status = _network_client.execute_post_request(_query, &_cached_response);
if (!status.ok() || _network_client.get_http_status() != 200) {
return Status::InternalError(err_msg);
}
VLOG(1) << "open _cached response: " << _cached_response;
return Status::OK();
}
在 open()
阶段,我们发起了请求,验证了 url 的合法性,ES 就直接把结果返回给我们了。SR 不能白白浪费这一次请求,故先把返回的 json 结果暂存在 _cached_response 中,等到 get_next()
被调用的时候直接解析。
get_next()
ESDataSource
ESDataSouce 中的 get_next()
主要分为三步:
Status ESDataSource::get_next(RuntimeState* state, vectorized::ChunkPtr* chunk) {
RETURN_IF_ERROR(_es_reader->get_next(&_batch_eof, _es_scroll_parser));
RETURN_IF_ERROR(_es_scroll_parser->fill_chunk(state, chunk, &_line_eof));
RETURN_IF_ERROR(ExecNode::eval_conjuncts(_conjunct_ctxs, ck));
return Status::OK();
}
- 调用 ESScanReader 的
get_next()
方法,在 ESScanReader 中,会直接调用 ScrollParser 的parse()
方法。如果是第一次请求,会使用open()
阶段缓存的 _cached_response。 - 调用 ScrollParser 的
fill_chunk()
方法,把parse()
的结果保存到 chunk 中。chunk 中有很多列,就一个一个 column 填充。目前按行填充,效率低?。 eval_conjuncts()
对 fill 后的 chunk 执行表达式。执行的是那些没有被下推到 ES 的表达式。
ESScanReader
ESScanReader 中的 get_next()
逻辑比较简单,不做阐述。主要就是处理 scroll 读取逻辑,每次读一部分。然后再调用 ScrollParser 的 parse()
方法解析得到的 json 数据。
ScrollParser
这里稍微说明下 ScrollParser 的 fill_chunk()
方法,parse()
其实只是保存了 json 数据,真正的解析工作由 fill_chunk()
方法负责。
Status ScrollParser::fill_chunk(RuntimeState* state, ChunkPtr* chunk, bool* line_eos) {
*chunk = std::make_shared<Chunk>();
std::vector<SlotDescriptor*> slot_descs = _tuple_desc->slots();
// 剩余待插入数据条数
size_t left_sz = _size - _cur_line;
// 保证待插入数据条数不超过 chunk_size()
size_t fill_sz = std::min(left_sz, (size_t)state->chunk_size());
// 初始化 chunk 里面的 column,可以使用 ChunkHelper 简化?
for (auto& slot_desc : slot_descs) {
ColumnPtr column = ColumnHelper::create_column(slot_desc->type(), slot_desc->is_nullable());
column->reserve(fill_sz);
(*chunk)->append_column(std::move(column), slot_desc->id());
}
auto slots = _tuple_desc->slots();
// TODO: we could fill chunk by column rather than row
for (size_t i = 0; i < fill_sz; ++i) {
const rapidjson::Value& obj = _inner_hits_node[_cur_line + i];
// 判断是否读取 doc_values,在返回的结果中,不可能同时存在 fields 和 _source 字段。
// fields 字段就是 doc_values 的意思。
bool pure_doc_value = _is_pure_doc_value(obj);
bool has_source = obj.HasMember(FIELD_SOURCE);
bool has_fields = obj.HasMember(FIELD_FIELDS);
// fields 和 _source 同时不存在,就直接给 column 插默认值。
if (!has_source && !has_fields) {
for (size_t col_idx = 0; col_idx < slots.size(); ++col_idx) {
SlotDescriptor* slot_desc = slot_descs[col_idx];
ColumnPtr& column = (*chunk)->get_column_by_slot_id(slot_desc->id());
if (slot_desc->is_nullable()) {
column->append_default();
} else {
return Status::DataQualityError(
fmt::format("col `{}` is not null, but value from ES is null", slot_desc->col_name()));
}
}
continue;
}
// 检查,前面说了,fields 和 _source 必定存在一个,而且两个不可能同时存在
DCHECK(has_source ^ has_fields);
const rapidjson::Value& line = has_source ? obj[FIELD_SOURCE] : obj[FIELD_FIELDS];
for (size_t col_idx = 0; col_idx < slots.size(); ++col_idx) {
SlotDescriptor* slot_desc = slot_descs[col_idx];
ColumnPtr& column = (*chunk)->get_column_by_slot_id(slot_desc->id());
// _id field must exists in every document, this is guaranteed by ES
// if _id was found in tuple, we would get `_id` value from inner-hit node
// json-format response would like below:
// "hits": {
// "hits": [
// {
// "_id": "UhHNc3IB8XwmcbhBk1ES",
// "_source": {
// "k": 201,
// }
// }
// ]
// }
// 这个分支不会走到,留着干什么未知?而且在 ES8中,貌似根本就没有 _id 字段了?
if (slot_desc->col_name() == FIELD_ID) {
// actually this branch will not be reached, this is guaranteed by Doris FE.
if (pure_doc_value) {
return Status::RuntimeError("obtain `_id` is not supported in doc_values mode");
}
PrimitiveType type = slot_desc->type().type;
DCHECK(type == TYPE_CHAR || type == TYPE_VARCHAR);
const auto& _id = obj[FIELD_ID];
Slice slice(_id.GetString(), _id.GetStringLength());
_append_data<TYPE_VARCHAR>(column.get(), slice);
continue;
}
// if pure_doc_value enabled, docvalue_context must contains the key
// todo: need move all `pure_docvalue` for every tuple outside fill_tuple
// should check pure_docvalue for one table scan not every tuple
const char* col_name = pure_doc_value ? _doc_value_context->at(slot_desc->col_name()).c_str()
: slot_desc->col_name().c_str();
auto has_col = line.HasMember(col_name);
if (has_col) {
const rapidjson::Value& col = line[col_name];
// doc value
bool is_null = col.IsNull() || (pure_doc_value && col.IsArray() && (col.Empty() || col[0].IsNull()));
if (!is_null) {
// append value from ES to column
RETURN_IF_ERROR(_append_value_from_json_val(column.get(), slot_desc->type(), col, pure_doc_value));
continue;
}
// handle null col
if (slot_desc->is_nullable()) {
_append_null(column.get());
} else {
return Status::DataQualityError(
fmt::format("col `{}` is not null, but value from ES is null", slot_desc->col_name()));
}
} else {
// if don't has col in ES , append a default value
_append_null(column.get());
}
}
}
_cur_line += fill_sz;
return Status::OK();
}
ScrollParser 中由_append_value_from_json_val()
具体负责将一个 value 值插入 column 中。比较简单,就是用了模版方法,包含了各种类型转换。
原创文章,作者:Smith,如若转载,请注明出处:https://www.inlighting.org/archives/starrocks-es-external-table-source-code-reading
评论列表(7条)
为什么这个ElasticSearch不能直接对接jni scanner? 这样直接升级ES的Jar就行了。里面一坨http的啥东西,还要拼凑json.
你好。请问StarRocks支持Opensearch外部表吗,测试了一下,不行。不知道是我配置问题还是不支持的问题。
@Musa:他们兼容 es 的接口吗,不兼容肯定不支持啊。
@Smith:Opensearch是基于ES 7.10开发的,兼容ES 7.10及以下的接口。StarRocks报错是这个[42000][1064] Failed to connect to ES server, errmsg is: The requested URL returned error: 400。应该是兼容性问题
@Musa:可以看看 fe.log,里面有更加具体的报错信息
@Smith:感谢,我去看看。
Smith 🐂B!