Skip to main content

Elasticsearch Catalog

Elasticsearch Catalog not only supports automatic mapping of ES metadata, but also combines Doris's distributed query planning capabilities with ES (Elasticsearch)'s full-text search capabilities to provide a more comprehensive OLAP analysis solution:

  1. Multi-index distributed Join queries in ES.

  2. Joint queries between tables in Doris and ES, with more complex full-text search filtering.

Prerequisites

Supports Elasticsearch 5.x and above.

Connecting to Elasticsearch

CREATE CATALOG es_catalog PROPERTIES (
'type' = 'es',
'hosts' = 'http://127.0.0.1:9200'
{ElasticsearchProperties}
);
  • {ElasticsearchProperties}
ParameterRequiredDefaultDescription
hostsYesES address, can be one or multiple, or ES load balancer address
userNoEmptyES username
passwordNoEmptyPassword for the corresponding user
doc_value_scanNotrueWhether to enable getting query field values through ES/Lucene columnar storage
keyword_sniffNotrueWhether to detect text.fields for string tokenization types in ES and query through keyword. Setting to false will match based on tokenized content
nodes_discoveryNotrueWhether to enable ES node discovery, default is true. Set to false in network isolated environments to connect only to specified nodes
sslNofalseWhether ES enables https access mode, currently implemented in fe/be as trust all
mapping_es_idNofalseWhether to map the _id field in ES index
like_push_downNotrueWhether to convert like to wildcard and push down to ES, will increase ES CPU consumption
include_hidden_indexNofalseWhether to include hidden indexes, default is false

Notes:

  1. Authentication currently only supports HTTP Basic authentication, and you need to ensure that the user has access to paths like /_cluster/state/, _nodes/http and Index read permissions; username and password are not required when cluster security authentication is not enabled.

  2. In 5.x and 6.x versions, the first type is taken by default for multiple types in one Index.

Hierarchy Mapping

Since Elasticsearch doesn't have the concept of Database, after connecting to ES, a unique Database will be automatically generated: default_db.

After switching to ES Catalog using the SWITCH command, it will automatically switch to default_db, no need to execute USE default_db command.

Column Type Mapping

ES TypeDoris TypeComment
nullnull
booleanboolean
bytetinyint
shortsmallint
integerint
longbigint
unsigned_longlargeint
floatfloat
half_floatfloat
doubledouble
scaled_floatdouble
datedateOnly supports default/yyyy-MM-dd HH:mm:ss/yyyy-MM-dd/epoch_millis formats
keywordstring
textstring
ipstring
constant_keywordstring
wildcardstring
nestedjson
objectjson
flattenedjsonSupported since version 3.1.4, 4.0.3
otherUNSUPPORTED

Array Types

Elasticsearch doesn't have an explicit array type, but its fields can contain 0 or more values.

To indicate that a field is an array type, you can add a specific doris structure annotation in the _meta section of the index mapping.

For Elasticsearch 6.x and earlier versions, please refer to _meta.

For example, suppose there's an index doc containing the following data structure:

{
"array_int_field": [1, 2, 3, 4],
"array_string_field": ["doris", "is", "the", "best"],
"id_field": "id-xxx-xxx",
"timestamp_field": "2022-11-12T12:08:56Z",
"array_object_field": [
{
"name": "xxx",
"age": 18
}
]
}

The array fields of this structure can be defined by adding field property definitions to the _meta.doris property of the target index mapping using the following commands.

# ES 7.x and above
curl -X PUT "localhost:9200/doc/_mapping?pretty" -H 'Content-Type:application/json' -d '
{
"_meta": {
"doris":{
"array_fields":[
"array_int_field",
"array_string_field",
"array_object_field"
]
}
}
}'

# ES 6.x and before
curl -X PUT "localhost:9200/doc/_mapping/_doc?pretty" -H 'Content-Type: application/json' -d '
{
"_meta": {
"doris":{
"array_fields":[
"array_int_field",
"array_string_field",
"array_object_field"
]
}
}
}

array_fields: Used to indicate fields that are array types.

flattened Type

For flattened type, when the enable_docvalue_scan property is false, the JSON data format read out is flattened. If the enable_docvalue_scan property is true, then the original JSON format is read out. Example as follows:

Index definition:

"mappings": {
"properties": {
"column1": {
"type": "keyword"
},
"extra": {
"type": "flattened"
}
}
}

Data:

{
"column1": 1,
"extra": {
"subcol1": "abc",
"sub_array": [
{"k1": "element1"},
{"k2": "element2"},
{"k3": "element3"}
]
}
}
  • When enable_docvalue_scan property is false

    The query result for extra column is:

    {
    "subcol1": "abc",
    "sub_array": [
    {"k1": "element1"},
    {"k2": "element2"},
    {"k3": "element3"}
    ]
    }
  • When enable_docvalue_scan property is true

    The query result for extra column is:

    ["abc","element1","element2","element3"]

Query Operations

After configuring the Catalog, you can query table data in the Catalog through the following methods:

-- 1. switch to catalog, use database and query
SWITCH es_ctl;
SELECT * FROM es_tbl LIMIT 10;

-- 2. use default_db directly
USE es_ctl.default_db;
SELECT * FROM es_tbl LIMIT 10;

-- 3. use full qualified name to query
SELECT * FROM es_ctl.default_db.es_tbl LIMIT 10;

For more advanced query features, please refer to the [Best Practices] section.

Best Practices

Filter Predicate Pushdown

ES Catalog supports filter predicate pushdown: filter conditions are pushed down to ES, so only data that truly meets the conditions will be returned, which can significantly improve query performance and reduce CPU, Memory, IO usage of Doris and Elasticsearch.

The following operators will be optimized into the corresponding ES Query:

SQL syntaxES 5.x+ syntax
=term query
interms query
> , < , >= , <=range query
andbool.filter
orbool.should
notbool.must_not
not inbool.must_not + terms query
is_not_nullexists query
is_nullbool.must_not + exists query
esqueryES native json form QueryDSL

Enable Columnar Scan to Optimize Query Speed (enable_docvalue_scan=true)

Set "enable_docvalue_scan" = "true"

After enabling, Doris will follow these two principles when getting data from ES:

  • Best Effort: Automatically detect whether the fields to be read have columnar storage enabled (doc_value: true). If all fields have columnar storage, Doris will get all field values from columnar storage.

  • Automatic Degradation: If any field to be retrieved doesn't have columnar storage, all field values will be parsed and retrieved from row storage _source.

Advantages:

By default, Doris On ES will get all required columns from row storage, i.e., _source. _source storage uses row-based + JSON format, which has inferior batch reading performance compared to columnar storage, especially when only a few columns are needed. In cases where only a few columns are needed, docvalue performance is about ten times better than _source performance.

Notes:

  1. text type fields don't have columnar storage in ES, so if any field value to be retrieved is of text type, it will automatically degrade to getting from _source.

  2. When the number of fields to retrieve is too many (>= 25), the performance of getting field values from docvalue will be basically the same as getting field values from _source.

  3. keyword type fields may appear empty due to the ignore_above parameter limitation for long text fields that exceed this limit. In this case, you need to disable enable_docvalue_scan and get results from _source.

Detect Keyword Type Fields

Set "enable_keyword_sniff" = "true"

In ES, you can import data directly without establishing an index. In this case, ES will automatically create a new index. For string type fields, ES will create fields that have both text type and keyword type, which is ES's multi fields feature, with mapping as follows:

"k4": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}

When filtering on k4, such as =, Doris On ES will convert the query to ES's TermQuery.

SQL filter condition:

k4 = "Doris On ES"

Converted to ES query DSL:

"term" : {
"k4": "Doris On ES"

}

Because k4's first field type is text, during data import it will be tokenized according to k4's configured tokenizer (if not set, it's the standard tokenizer) to get three Terms: doris, on, es, as analyzed by ES analyze API:

POST /_analyze
{
"analyzer": "standard",
"text": "Doris On ES"
}

The tokenization result is:

{
"tokens": [
{
"token": "doris",
"start_offset": 0,
"end_offset": 5,
"type": "<ALPHANUM>",
"position": 0
},
{
"token": "on",
"start_offset": 6,
"end_offset": 8,
"type": "<ALPHANUM>",
"position": 1
},
{
"token": "es",
"start_offset": 9,
"end_offset": 11,
"type": "<ALPHANUM>",
"position": 2
}
]
}

The query uses:

"term" : {
"k4": "Doris On ES"
}

The term Doris On ES cannot match any term in the dictionary and won't return any results. Enabling enable_keyword_sniff: true will automatically convert k4 = "Doris On ES" to k4.keyword = "Doris On ES" to completely match SQL semantics. The converted ES query DSL is:

"term" : {
"k4.keyword": "Doris On ES"
}

k4.keyword type is keyword, and data is written to ES as a complete term, so it can match.

Enable Node Auto Discovery (nodes_discovery=true)

Set "nodes_discovery" = "true"

When configured as true, Doris will find all available related data nodes (allocated shards above) from ES. If ES data node addresses are not accessible by Doris BE, set to false. ES cluster is deployed in an intranet isolated from the public Internet, and users access through proxy.

Whether ES Cluster Enables HTTPS Access Mode

Set "ssl" = "true"

Currently FE/BE implementation is trust all, this is a temporary solution, real user configured certificates will be used later.

Extended esquery() Function

Use esquery(field, QueryDSL) function to push down some queries that cannot be expressed in SQL, such as match_phrase, geoshape, etc., to ES for filtering. The first column name parameter of esquery is used to associate index, and the second parameter is the JSON representation of ES's basic Query DSL, enclosed in curly braces {}. The JSON root key must have one and only one, such as match_phrase, geo_shape, bool, etc.

match_phrase query:

select * from es_table where esquery(k4, '{
"match_phrase": {
"k4": "doris on es"
}
}');

geo related queries:

select * from es_table where esquery(k4, '{
"geo_shape": {
"location": {
"shape": {
"type": "envelope",
"coordinates": [
[
13,
53
],
[
14,
52
]
]
},
"relation": "within"
}
}
}');

bool query:

select * from es_table where esquery(k4, ' {
"bool": {
"must": [
{
"terms": {
"k1": [
11,
12
]
}
},
{
"terms": {
"k2": [
100
]
}
}
]
}
}');

Time Type Field Usage Recommendations

Only applicable to ES external tables. In ES Catalog, date types are automatically mapped to Date or Datetime.

In ES, time type fields are very flexible to use, but in ES external tables, if time type fields are not properly configured, filter conditions cannot be pushed down.

When creating an index, set time type format for maximum format compatibility:

 "dt": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}

When establishing this field in Doris, it's recommended to set it as date or datetime, or it can be set as varchar type. Using the following SQL statements can directly push filter conditions down to ES:

select * from doe where k2 > '2020-06-21';

select * from doe where k2 < '2020-06-21 12:00:00';

select * from doe where k2 < 1593497011;

select * from doe where k2 < now();

select * from doe where k2 < date_format(now(), '%Y-%m-%d');

Note:

  • If you don't set format for time type fields in ES, the default time type field format is:

    strict_date_optional_time||epoch_millis
  • If date fields imported to ES are timestamps, they need to be converted to ms. ES processes timestamps internally in ms, otherwise ES external tables will have display errors.

Getting ES Metadata Field ID

When importing documents without specifying _id, ES will assign a globally unique _id (primary key) to each document. Users can also specify a _id with special business meaning when importing.

If you need to get this field value in ES external tables, you can add a _id field of varchar type when creating the table:

CREATE EXTERNAL TABLE `doe` (
`_id` varchar COMMENT "",
`city` varchar COMMENT ""
) ENGINE=ELASTICSEARCH
PROPERTIES (
"hosts" = "http://127.0.0.1:8200",
"user" = "root",
"password" = "root",
"index" = "doe"
}

If you need to get this field value in ES Catalog, please set "mapping_es_id" = "true".

Note:

  1. Filter conditions for _id field only support = and in.

  2. _id field must be of varchar type.

Getting Globally Ordered Query Results

In scenarios like relevance ranking and priority display of important content, it's very useful for ES query results to be sorted by score. Doris queries ES to fully utilize the advantages of MPP architecture by pulling data according to the shard distribution of ES indexes. To get globally ordered sorting results, ES needs to be queried from a single point. This can be controlled by the session variable enable_es_parallel_scroll (default is true). When enable_es_parallel_scroll=false is set, Doris will send scroll queries without shard_preference and sort information to the ES cluster, thus getting globally ordered results. Note: Use with caution when the query result set is large.

Modifying scroll Request batch Size

The default batch size for scroll requests is 4064. It can be modified through the session variable batch_size.

Frequently Asked Questions

  1. Does it support ES clusters with X-Pack authentication?

    Supports all ES clusters using HTTP Basic authentication.

  2. Some queries are much slower than requesting ES directly

    Yes, such as _count related queries, etc. ES will directly read metadata related to the number of documents that meet the conditions internally, without filtering actual data.

  3. Can aggregation operations be pushed down?

    Currently Doris On ES doesn't support pushing down aggregation operations like sum, avg, min/max, etc. The calculation method is to batch stream all documents meeting the conditions from ES, then calculate in Doris.

Appendix

Principle of Doris Querying ES

+----------------------------------------------+
| |
| Doris +------------------+ |
| | FE +--------------+-------+
| | | Request Shard Location
| +--+-------------+-+ | |
| ^ ^ | |
| | | | |
| +-------------------+ +------------------+ | |
| | | | | | | | |
| | +----------+----+ | | +--+-----------+ | | |
| | | BE | | | | BE | | | |
| | +---------------+ | | +--------------+ | | |
+----------------------------------------------+ |
| | | | | | |
| | | | | | |
| HTTP SCROLL | | HTTP SCROLL | |
+-----------+---------------------+------------+ |
| | v | | v | | |
| | +------+--------+ | | +------+-------+ | | |
| | | | | | | | | | |
| | | DataNode | | | | DataNode +<-----------+
| | | | | | | | | | |
| | | +<--------------------------------+
| | +---------------+ | | |--------------| | | |
| +-------------------+ +------------------+ | |
| Same Physical Node | |
| | |
| +-----------------------+ | |
| | | | |
| | MasterNode +<-----------------+
| ES | | |
| +-----------------------+ |
+----------------------------------------------+
  1. FE will request the hosts specified when creating the table to get HTTP port information of all nodes and shard distribution information of indexes, etc. If the request fails, it will traverse the host list sequentially until success or complete failure.

  2. When querying, a query plan will be generated and sent to corresponding BE nodes based on some node information and index metadata information obtained by FE.

  3. BE nodes will request ES nodes deployed locally first according to the proximity principle. BE gets data concurrently from each shard of ES index through HTTP Scroll method streaming from _source or docvalue.

  4. After Doris completes the calculation, it returns the results to the user.