跳到主要内容

Fluent Bit Doris Output Plugin

Fluent Bit 是一个快速的日志处理器和转发器,它支持自定义输出插件将数据写入存储系统,Fluent Bit Doris Output Plugin 是输出到 Doris 的插件。

Fluent Bit Doris Output Plugin 调用 Doris Stream Load HTTP 接口将数据实时写入 Doris,提供多线程并发,失败重试,自定义 Stream Load 格式和参数,输出写入速度等能力。

使用 Fluent Bit Doris Output Plugin 主要有三个步骤:

  1. 下载或编译包含 Doris Output Plugin 的 Fluent Bit 二进制程序
  2. 配置 Fluent Bit 输出地址和其他参数
  3. 启动 Fluent Bit 将数据实时写入 Doris

安装(alpha 版本)

从官网下载

https://github.com/joker-star-l/fluent-bit/releases/download/v3.1.9-alpha/fluent-bit

从源码编译

克隆 https://github.com/joker-star-l/fluent-bit 的 dev 分支,在 build/ 目录下执行

cmake -DFLB_RELEASE=ON ..
make

编译产物为 build/bin/fluent-bit。

参数配置

Fluent Bit Doris output plugin 的配置如下:

配置说明
hostStream Load HTTP host
portStream Load HTTP port
userDoris 用户名,该用户需要有doris对应库表的导入权限
passwordDoris 用户的密码
database要写入的 Doris 库名
table要写入的 Doris 表名
label_prefixDoris Stream Load Label 前缀,最终生成的 Label 为 {label_prefix}_{timestamp}_{uuid} ,默认值是 fluentbit, 如果设置为 false 则不会添加 Label
time_key数据中要添加的时间戳列的名称,默认值是 date, 如果设置为 false 则不会添加该列
headerDoris Stream Load 的 header 参数,可以设置多个
log_request日志中是否输出 Doris Stream Load 请求和响应元数据,用于排查问题,默认为 true
log_progress_interval日志中输出速度的时间间隔,单位是秒,默认为 10,设置为 0 可以关闭这种日志
retry_limitDoris Stream Load 请求失败重试次数,默认为 1, 如果设置为 false 则不限制重试次数
workers执行 Doris Stream Load 的 worker 数量,默认为 2

使用示例

TEXT 日志采集示例

该示例以 Doris FE 的日志为例展示 TEXT 日志采集。

1. 数据

FE 日志文件一般位于 Doris 安装目录下的 fe/log/fe.log 文件,是典型的 Java 程序日志,包括时间戳,日志级别,线程名,代码位置,日志内容等字段。不仅有正常的日志,还有带 stacktrace 的异常日志,stacktrace 是跨行的,日志采集存储需要把主日志和 stacktrace 组合成一条日志。

2024-07-08 21:18:01,432 INFO (Statistics Job Appender|61) [StatisticsJobAppender.runAfterCatalogReady():70] Stats table not available, skip
2024-07-08 21:18:53,710 WARN (STATS_FETCH-0|208) [StmtExecutor.executeInternalQuery():3332] Failed to run internal SQL: OriginStatement{originStmt='SELECT * FROM __internal_schema.column_statistics WHERE part_id is NULL ORDER BY update_time DESC LIMIT 500000', idx=0}
org.apache.doris.common.UserException: errCode = 2, detailMessage = tablet 10031 has no queryable replicas. err: replica 10032's backend 10008 does not exist or not alive
at org.apache.doris.planner.OlapScanNode.addScanRangeLocations(OlapScanNode.java:931) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.planner.OlapScanNode.computeTabletInfo(OlapScanNode.java:1197) ~[doris-fe.jar:1.2-SNAPSHOT]

2. 建表

表结构包括日志的产生时间,采集时间,主机名,日志文件路径,日志类型,日志级别,线程名,代码位置,日志内容等字段。

CREATE TABLE `doris_log` (
`log_time` datetime NULL COMMENT 'log content time',
`collect_time` datetime NULL COMMENT 'log agent collect time',
`host` text NULL COMMENT 'hostname or ip',
`path` text NULL COMMENT 'log file path',
`type` text NULL COMMENT 'log type',
`level` text NULL COMMENT 'log level',
`thread` text NULL COMMENT 'log thread',
`position` text NULL COMMENT 'log code position',
`message` text NULL COMMENT 'log message',
INDEX idx_host (`host`) USING INVERTED COMMENT '',
INDEX idx_path (`path`) USING INVERTED COMMENT '',
INDEX idx_type (`type`) USING INVERTED COMMENT '',
INDEX idx_level (`level`) USING INVERTED COMMENT '',
INDEX idx_thread (`thread`) USING INVERTED COMMENT '',
INDEX idx_position (`position`) USING INVERTED COMMENT '',
INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`log_time`)
COMMENT 'OLAP'
PARTITION BY RANGE(`log_time`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "true",
"compaction_policy" = "time_series"
);

3. 配置

Fluent Bit 日志采集的配置文件如下,doris_log.conf 用于定义 ETL 的各个部分组件,parsers.conf 用于定义不同的日志解析器。

doris_log.conf:

# config for Fluent Bit service
[SERVICE]
log_level info
# parsers file
parsers_file parsers.conf

# use input tail
[INPUT]
name tail
path /path/to/your/log
# add log file name to the record, key is 'path'
path_key path
# set multiline parser
multiline.parser multiline_java

# parse log
[FILTER]
match *
name parser
key_name log
parser fe_log
reserve_data true

# add host info
[FILTER]
name sysinfo
match *
# add hostname to the record, key is 'host'
hostname_key host

# output to doris
[OUTPUT]
name doris
match *
host fehost
port feport
user your_username
password your_password
database your_db
table your_table
# add 'collect_time' to the record
time_key collect_time
# 'collect_time' is timestamp, change it to datatime
header columns collect_time=from_unixtime(collect_time)
log_request true
log_progress_interval 10

parsers.conf:

[MULTILINE_PARSER]
name multiline_java
type regex
flush_timeout 1000
# Regex rules for multiline parsing
# ---------------------------------
#
# configuration hints:
#
# - first state always has the name: start_state
# - every field in the rule must be inside double quotes
#
# rules | state name | regex pattern | next state name
# --------|----------------|---------------|-----------------
rule "start_state" "/(^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2})(.*)/" "cont"
rule "cont" "/(^(?![0-9]{4}-[0-9]{2}-[0-9]{2}))(.*)/" "cont"


[PARSER]
name fe_log
format regex
# parse and add 'log_time', 'level', 'thread', 'position', 'message' to the record
regex ^(?<log_time>[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (?<level>[^ ]+) \((?<thread>[^\)]+)\) \[(?<position>[^\]]+)\] (?<message>(\n|.)*)\n$

4. 运行 Fluent Bit

fluent-bit -c doris_log.conf

# log stream load response

[2024/10/31 18:39:55] [ info] [output:doris:doris.1] 127.0.0.1:8040, HTTP status=200
{
"TxnId": 32155,
"Label": "fluentbit_1730371195_91cca1aa-c15f-45d2-b503-fe7d2e839c2a",
"Comment": "",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 1,
"NumberLoadedRows": 1,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 836,
"LoadTimeMs": 298,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 3,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 268,
"CommitAndPublishTimeMs": 25
}

# log speed info

[2024/10/31 18:40:13] [ info] [output:doris:doris.1] total 0 MB 2 ROWS, total speed 0 MB/s 0 R/s, last 10 seconds speed 0 MB/s 0 R/s

JSON 日志采集示例

该样例以 github events archive 的数据为例展示 JSON 日志采集。

1. 数据

github events archive 是 github 用户操作事件的归档数据,格式是 JSON,可以从 https://www.gharchive.org/ 下载,比如下载 2024年1月1日15点的数据。

wget https://data.gharchive.org/2024-01-01-15.json.gz

下面是一条数据样例,实际一条数据一行,这里为了方便展示进行了格式化。

{
"id": "37066529221",
"type": "PushEvent",
"actor": {
"id": 46139131,
"login": "Bard89",
"display_login": "Bard89",
"gravatar_id": "",
"url": "https://api.github.com/users/Bard89",
"avatar_url": "https://avatars.githubusercontent.com/u/46139131?"
},
"repo": {
"id": 780125623,
"name": "Bard89/talk-to-me",
"url": "https://api.github.com/repos/Bard89/talk-to-me"
},
"payload": {
"repository_id": 780125623,
"push_id": 17799451992,
"size": 1,
"distinct_size": 1,
"ref": "refs/heads/add_mvcs",
"head": "f03baa2de66f88f5f1754ce3fa30972667f87e81",
"before": "85e6544ede4ae3f132fe2f5f1ce0ce35a3169d21"
},
"public": true,
"created_at": "2024-04-01T23:00:00Z"
}

2. 建表

CREATE TABLE github_events
(
`created_at` DATETIME,
`id` BIGINT,
`type` TEXT,
`public` BOOLEAN,
`actor` VARIANT,
`repo` VARIANT,
`payload` TEXT,
INDEX `idx_id` (`id`) USING INVERTED,
INDEX `idx_type` (`type`) USING INVERTED,
INDEX `idx_actor` (`actor`) USING INVERTED,
INDEX `idx_host` (`repo`) USING INVERTED,
INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
)
ENGINE = OLAP
DUPLICATE KEY(`created_at`)
PARTITION BY RANGE(`created_at`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"compaction_policy" = "time_series",
"enable_single_replica_compaction" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.replication_num" = "1"
);

3. 配置

和之前 TEXT 日志采集相比,该配置文件没有使用 FILTER,因为不需要额外的处理转换。

github_events.conf:

[SERVICE]
log_level info
parsers_file github_parsers.conf

[INPUT]
name tail
parser github
path /path/to/your/log

[OUTPUT]
name doris
match *
host fehost
port feport
user your_username
password your_password
database your_db
table your_table
time_key false
log_request true
log_progress_interval 10

github_parsers.conf:

[PARSER]
name github
format json

4. 运行 Fluent Bit

fluent-bit -c github_events.conf