Logstash Doris output plugin
介绍
Logstash 是一个日志 ETL 框架(采集,预处理,发送到存储系统),它支持自定义输出插件将数据写入存储系统,Logstash Doris output plugin 是输出到 Doris 的插件。
Logstash Doris output plugin 调用 Doris Stream Load HTTP 接口将数据实时写入 Doris,提供多线程并发,失败重试,自定义 Stream Load 格式和参数,输出写入速度等能力。
使用 Logstash Doris output plugin 主要有三个步骤:
- 将插件安装到 Logstash 中
- 配置 Doris 输出地址和其他参数
- 启动 Logstash 将数据实时写入 Doris
安装
获取插件
可以从官网下载或者自行从源码编译 Logstash Doris output plugin。
-
从官网下载
-
从源码编译
cd extension/logstash/
gem build logstash-output-doris.gemspec
安装插件
- 普通安装
${LOGSTASH_HOME} 是 Logstash 的安装目录,运行它下面的 bin/logstash-plugin 命令安装插件
${LOGSTASH_HOME}/bin/logstash-plugin install logstash-output-doris-1.0.0.gem
Validating logstash-output-doris-1.0.0.gem
Installing logstash-output-doris
Installation successful
普通安装模式会自动安装插件依赖的 ruby 模块,对于网络不通的情况会卡住无法完成,这种情况下可以下载包含依赖的 zip 安装包进行完全离线安装,注意需要用 file:// 指定本地文件系统。
- 离线安装
${LOGSTASH_HOME}/bin/logstash-plugin install file:///tmp/logstash-output-doris-1.0.0.zip
Installing file: logstash-output-doris-1.0.0.zip
Resolving dependencies.........................
Install successful
参数配置
Logstash Doris output plugin 的配置如下:
配置 | 说明 |
---|---|
http_hosts | Stream Load HTTP 地址,格式是字符串数组,可以有一个或者多个元素,每个元素是 host:port。例如:["http://fe1:8030", "http://fe2:8030"] |
user | Doris 用户名,该用户需要有 doris 对应库表的导入权限 |
password | Doris 用户的密码 |
db | 要写入的 Doris 库名 |
table | 要写入的 Doris 表名 |
label_prefix | Doris Stream Load Label 前缀,最终生成的 Label 为 {label_prefix}{db}{table}{yyyymmdd_hhmmss}{uuid} ,默认值是 logstash |
headers | Doris Stream Load 的 headers 参数,语法格式为 ruby map,例如:headers => { "format" => "json" "read_json_by_line" => "true" } |
mapping | Logstash 字段到 Doris 表字段的映射,参考后续章节的使用示例 |
message_only | 一种特殊的 mapping 形式,只将 Logstash 的 @message 字段输出到 Doris,默认为 false |
max_retries | Doris Stream Load 请求失败重试次数,默认为 -1 无限重试保证数据可靠性 |
log_request | 日志中是否输出 Doris Stream Load 请求和响应元数据,用于排查问题,默认为 false |
log_speed_interval | 日志中输出速度的时间间隔,单位是秒,默认为 10,设置为 0 可以关闭这种日志 |
使用示例
TEXT 日志采集示例
该示例以 Doris FE 的日志为例展示 TEXT 日志采集。
1. 数据
FE 日志文件一般位于 Doris 安装目录下的 fe/log/fe.log 文件,是典型的 Java 程序日志,包括时间戳,日志级别,线程名,代码位置,日志内容等字段。不仅有正常的日志,还有带 stacktrace 的异常日志,stacktrace 是跨行的,日志采集存储需要把主日志和 stacktrace 组合成一条日志。
2024-07-08 21:18:01,432 INFO (Statistics Job Appender|61) [StatisticsJobAppender.runAfterCatalogReady():70] Stats table not available, skip
2024-07-08 21:18:53,710 WARN (STATS_FETCH-0|208) [StmtExecutor.executeInternalQuery():3332] Failed to run internal SQL: OriginStatement{originStmt='SELECT * FROM __internal_schema.column_statistics WHERE part_id is NULL ORDER BY update_time DESC LIMIT 500000', idx=0}
org.apache.doris.common.UserException: errCode = 2, detailMessage = tablet 10031 has no queryable replicas. err: replica 10032's backend 10008 does not exist or not alive
at org.apache.doris.planner.OlapScanNode.addScanRangeLocations(OlapScanNode.java:931) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.planner.OlapScanNode.computeTabletInfo(OlapScanNode.java:1197) ~[doris-fe.jar:1.2-SNAPSHOT]
2. 建表
表结构包括日志的产生时间,采集时间,主机名,日志文件路径,日志类型,日志级别,线程名,代码位置,日志内容等字段。
CREATE TABLE `doris_log` (
`log_time` datetime NULL COMMENT 'log content time',
`collect_time` datetime NULL COMMENT 'log agent collect time',
`host` text NULL COMMENT 'hostname or ip',
`path` text NULL COMMENT 'log file path',
`type` text NULL COMMENT 'log type',
`level` text NULL COMMENT 'log level',
`thread` text NULL COMMENT 'log thread',
`position` text NULL COMMENT 'log code position',
`message` text NULL COMMENT 'log message',
INDEX idx_host (`host`) USING INVERTED COMMENT '',
INDEX idx_path (`path`) USING INVERTED COMMENT '',
INDEX idx_type (`type`) USING INVERTED COMMENT '',
INDEX idx_level (`level`) USING INVERTED COMMENT '',
INDEX idx_thread (`thread`) USING INVERTED COMMENT '',
INDEX idx_position (`position`) USING INVERTED COMMENT '',
INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`log_time`)
COMMENT 'OLAP'
PARTITION BY RANGE(`log_time`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "true",
"compaction_policy" = "time_series"
);
3. Logstash 配置
Logstash 主要有两类配置文件,一类是整个 Logstash 的配置文件,另一类是某个日志采集的配置文件。
整个 Logstash 的配置文件通常在 config/logstash.yml,为了提升写入 Doris 的性能需要修改 batch 大小和攒批时间,对于平均每条 i 几百字节的日志,推荐 100 万行和 10s。
pipeline.batch.size: 1000000
pipeline.batch.delay: 10000
某个日志采集的配置文件如 logstash_doris_log.conf 主要由 3 部分组成,分别对应 ETL 的各个部分:
- input 负责读取原始数据
- filter 负责做数据转换
- output 负责将数据输出
# 1. input 负责读取原始数据
# file input 是一个 input plugin,可以配置读取的日志文件路径,通过 multiline codec 将非时间开头的行拼接到上一行后面,实现 stacktrace 和主日志合并的效果。file input 会将日志内容保存在 @message 字段中,另外还有一些元数据字段比如 host,log.file.path,这里我们还通过 add_field 手动添加了一个字段 type,它的值为 fe.log 。
input {
file {
path => "/mnt/disk2/xiaokang/opt/doris_master/fe/log/fe.log"
add_field => {"type" => "fe.log"}
codec => multiline {
# valid line starts with timestamp
pattern => "^%{TIMESTAMP_ISO8601} "
# any line not starting with a timestamp should be merged with the previous line
negate => true
what => "previous"
}
}
}
# 2. filter 部分负责数据转换
# grok 是一个常用的数据转换插件,它内置了一些常见的pattern 比如 TIMESTAMP_ISO8601 解析时间戳,还支持写正则表达式提取字段。
filter {
grok {
match => {
# parse log_time, level, thread, position fields from message
"message" => "%{TIMESTAMP_ISO8601:log_time} (?<level>[A-Z]+) \((?<thread>[^\[]*)\) \[(?<position>[^\]]*)\]"
}
}
}
# 3. output 部分负责数据输出
# doris output 将数据输出到 Doris,使用的是 Stream Load HTTP 接口。通过 headers 参数指定了 Stream Load 的数据格式为 JSON,通过 mapping 参数指定 Logstash 字段到 JSON 字段的映射。由于 headers 指定了 "format" => "json",Stream Load 会自动解析 JSON 字段写入对应的 Doris 表的字段。
output {
doris {
http_hosts => ["http://localhost:8630"]
user => "root"
password => ""
db => "log_db"
table => "doris_log"
headers => {
"format" => "json"
"read_json_by_line" => "true"
"load_to_single_tablet" => "true"
}
mapping => {
"log_time" => "%{log_time}"
"collect_time" => "%{@timestamp}"
"host" => "%{[host][name]}"
"path" => "%{[log][file][path]}"
"type" => "%{type}"
"level" => "%{level}"
"thread" => "%{thread}"
"position" => "%{position}"
"message" => "%{message}"
}
log_request => true
}
}
4. 运行 Logstash
${LOGSTASH_HOME}/bin/logstash -f config/logstash_doris_log.conf
# log_request 为 true 时日志会输出每次 Stream Load 的请求参数和响应结果
[2024-07-08T22:35:34,772][INFO ][logstash.outputs.doris ][main][e44d2a24f17d764647ce56f5fed24b9bbf08d3020c7fddcc3298800daface80a] doris stream load response:
{
"TxnId": 45464,
"Label": "logstash_log_db_doris_log_20240708_223532_539_6c20a0d1-dcab-4b8e-9bc0-76b46a929bd1",
"Comment": "",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 452,
"NumberLoadedRows": 452,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 277230,
"LoadTimeMs": 1797,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 18,
"ReadDataTimeMs": 9,
"WriteDataTimeMs": 1758,
"CommitAndPublishTimeMs": 18
}
# 默认每隔 10s 会日志输出速度信息,包括自启动以来的数据量(MB 和 ROWS),总速度(MB/s 和 R/S),最近 10s 速度
[2024-07-08T22:35:38,285][INFO ][logstash.outputs.doris ][main] total 11 MB 18978 ROWS, total speed 0 MB/s 632 R/s, last 10 seconds speed 1 MB/s 1897 R/s
JSON 日志采集示例
该样例以 github events archive 的数据为例展示 JSON 日志采集。
1. 数据
github events archive 是 github 用户操作事件的归档数据,格式是 JSON,可以从 https://www.gharchive.org/ 下载,比如下载 2024 年 1 月 1 日 15 点的数据。
wget https://data.gharchive.org/2024-01-01-15.json.gz
下面是一条数据样例,实际一条数据一行,这里为了方便展示进行了格式化。
{
"id": "37066529221",
"type": "PushEvent",
"actor": {
"id": 46139131,
"login": "Bard89",
"display_login": "Bard89",
"gravatar_id": "",
"url": "https://api.github.com/users/Bard89",
"avatar_url": "https://avatars.githubusercontent.com/u/46139131?"
},
"repo": {
"id": 780125623,
"name": "Bard89/talk-to-me",
"url": "https://api.github.com/repos/Bard89/talk-to-me"
},
"payload": {
"repository_id": 780125623,
"push_id": 17799451992,
"size": 1,
"distinct_size": 1,
"ref": "refs/heads/add_mvcs",
"head": "f03baa2de66f88f5f1754ce3fa30972667f87e81",
"before": "85e6544ede4ae3f132fe2f5f1ce0ce35a3169d21"
},
"public": true,
"created_at": "2024-04-01T23:00:00Z"
}
2. Doris 建表
CREATE DATABASE log_db;
USE log_db;
CREATE TABLE github_events
(
`created_at` DATETIME,
`id` BIGINT,
`type` TEXT,
`public` BOOLEAN,
`actor.id` BIGINT,
`actor.login` TEXT,
`actor.display_login` TEXT,
`actor.gravatar_id` TEXT,
`actor.url` TEXT,
`actor.avatar_url` TEXT,
`repo.id` BIGINT,
`repo.name` TEXT,
`repo.url` TEXT,
`payload` TEXT,
`host` TEXT,
`path` TEXT,
INDEX `idx_id` (`id`) USING INVERTED,
INDEX `idx_type` (`type`) USING INVERTED,
INDEX `idx_actor.id` (`actor.id`) USING INVERTED,
INDEX `idx_actor.login` (`actor.login`) USING INVERTED,
INDEX `idx_repo.id` (`repo.id`) USING INVERTED,
INDEX `idx_repo.name` (`repo.name`) USING INVERTED,
INDEX `idx_host` (`host`) USING INVERTED,
INDEX `idx_path` (`path`) USING INVERTED,
INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
)
ENGINE = OLAP
DUPLICATE KEY(`created_at`)
PARTITION BY RANGE(`created_at`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"compaction_policy" = "time_series",
"enable_single_replica_compaction" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.replication_num" = "1"
);
3. Logstash 配置
这个配置文件和之前 TEXT 日志采集不同的有下面几点:
- file input 的 codec 参数是 json,Logstash 会将每一行文本当作 JSON 格式解析,解析出来的字段用于后续处理
- 没有用 filter plugin,因为不需要额外的处理转换
input {
file {
path => "/tmp/github_events/2024-04-01-23.json"
codec => json
}
}
output {
doris {
http_hosts => ["http://fe1:8630", "http://fe2:8630", "http://fe3:8630"]
user => "root"
password => ""
db => "log_db"
table => "github_events"
headers => {
"format" => "json"
"read_json_by_line" => "true"
"load_to_single_tablet" => "true"
}
mapping => {
"created_at" => "%{created_at}"
"id" => "%{id}"
"type" => "%{type}"
"public" => "%{public}"
"actor.id" => "%{[actor][id]}"
"actor.login" => "%{[actor][login]}"
"actor.display_login" => "%{[actor][display_login]}"
"actor.gravatar_id" => "%{[actor][gravatar_id]}"
"actor.url" => "%{[actor][url]}"
"actor.avatar_url" => "%{[actor][avatar_url]}"
"repo.id" => "%{[repo][id]}"
"repo.name" => "%{[repo][name]}"
"repo.url" => "%{[repo][url]}"
"payload" => "%{[payload]}"
"host" => "%{[host][name]}"
"path" => "%{[log][file][path]}"
}
log_request => true
}
}
4. 运行 Logstash
${LOGSTASH_HOME}/bin/logstash -f logstash_github_events.conf