BROKER-LOAD
BROKER-LOAD
Name
BROKER LOAD
描述
该命令主要用于通过 Broker 服务进程读取远端存储(如 S3、HDFS)上的数据导入到 Doris 表里。
LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH BROKER broker_name
[broker_properties]
[load_properties]
[COMMENT "comments"];
- 
load_label每个导入需要指定一个唯一的 Label。后续可以通过这个 label 来查看作业进度。 [database.]label_name
- 
data_desc1用于描述一组需要导入的文件。 [MERGE|APPEND|DELETE]
 DATA INFILE
 (
 "file_path1"[, file_path2, ...]
 )
 [NEGATIVE]
 INTO TABLE `table_name`
 [PARTITION (p1, p2, ...)]
 [COLUMNS TERMINATED BY "column_separator"]
 [LINES TERMINATED BY "line_delimiter"]
 [FORMAT AS "file_type"]
 [COMPRESS_TYPE AS "compress_type"]
 [(column_list)]
 [COLUMNS FROM PATH AS (c1, c2, ...)]
 [SET (column_mapping)]
 [PRECEDING FILTER predicate]
 [WHERE predicate]
 [DELETE ON expr]
 [ORDER BY source_sequence]
 [PROPERTIES ("key1"="value1", ...)]- 
[MERGE|APPEND|DELETE]数据合并类型。默认为 APPEND,表示本次导入是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 模型表。其中 MERGE 类型需要配合 [DELETE ON]语句使用,以标注 Delete Flag 列。而 DELETE 类型则表示本次导入的所有数据皆为删除数据。
- 
DATA INFILE指定需要导入的文件路径。可以是多个。可以使用通配符。路径最终必须匹配到文件,如果只匹配到目录则导入会失败。 
- 
NEGATIVE该关键词用于表示本次导入为一批”负“导入。这种方式仅针对具有整型 SUM 聚合类型的聚合数据表。该方式会将导入数据中,SUM 聚合列对应的整型数值取反。主要用于冲抵之前导入错误的数据。 
- 
PARTITION(p1, p2, ...)可以指定仅导入表的某些分区。不在分区范围内的数据将被忽略。 
- 
COLUMNS TERMINATED BY指定列分隔符。仅在 CSV 格式下有效。仅能指定单字节分隔符。 
- 
LINES TERMINATED BY指定行分隔符。仅在 CSV 格式下有效。仅能指定单字节分隔符。 
- 
FORMAT AS指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。 
- 
COMPRESS_TYPE AS指定文件压缩类型,支持 GZ/BZ2/LZ4FRAME。
- 
column list用于指定原始文件中的列顺序。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。 (k1, k2, tmpk1)
- 
COLUMNS FROM PATH AS指定从导入文件路径中抽取的列。 
- 
SET (column_mapping)指定列的转换函数。 
- 
PRECEDING FILTER predicate前置过滤条件。数据首先根据 column list和COLUMNS FROM PATH AS按顺序拼接成原始数据行。然后按照前置过滤条件进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。
- 
WHERE predicate根据条件对导入的数据进行过滤。关于这部分详细介绍,可以参阅 列的映射,转换与过滤 文档。 
- 
DELETE ON expr需配合 MEREGE 导入模式一起使用,仅针对 Unique Key 模型的表。用于指定导入数据中表示 Delete Flag 的列和计算关系。 
- 
ORDER BY仅针对 Unique Key 模型的表。用于指定导入数据中表示 Sequence Col 的列。主要用于导入时保证数据顺序。 
- 
PROPERTIES ("key1"="value1", ...)指定导入的 format 的一些参数。如导入的文件是 json格式,则可以在这里指定json_root、jsonpaths、fuzzy_parse等参数。- enclose
 包围符。当 csv 数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为",",包围符为"'",数据为"a,'b,c'",则"b,c"会被解析为一个字段。 注意:当 enclose 设置为 "时,trim_double_quotes 一定要设置为 true。- escape
 转义符。用于转义在字段中出现的与包围符相同的字符。例如数据为"a,'b,'c'",包围符为"'",希望"b,'c 被作为一个字段解析,则需要指定单字节转义符,例如"",然后将数据修改为"a,'b,'c'"。 
 
- 
- 
WITH BROKER broker_name指定需要使用的 Broker 服务名称。在公有云 Doris 中。Broker 服务名称为 bos
- 
broker_properties指定 broker 所需的信息。这些信息通常被用于 Broker 能够访问远端存储系统。如 BOS 或 HDFS。关于具体信息,可参阅 Broker 文档。 (
 "key1" = "val1",
 "key2" = "val2",
 ...
 )- 
load_properties指定导入的相关参数。目前支持以下参数: - 
timeout导入超时时间。默认为 4 小时。单位秒。 
- 
max_filter_ratio最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。取值范围为 0 到 1。 
- 
exec_mem_limit导入内存限制。默认为 2GB。单位为字节。 
- 
strict_mode是否对数据进行严格限制。默认为 false。 
- 
partial_columns布尔类型,为 true 表示使用部分列更新,默认值为 false,该参数只允许在表模型为 Unique 且采用 Merge on Write 时设置。 
- 
timezone指定某些受时区影响的函数的时区,如 strftime/alignment_timestamp/from_unixtime等等,具体请查阅 时区 文档。如果不指定,则使用 "Asia/Shanghai" 时区
- 
load_parallelism导入并发度,默认为 1。调大导入并发度会启动多个执行计划同时执行导入任务,加快导入速度。 
- 
send_batch_parallelism用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 max_send_batch_parallelism_per_job,那么作为协调点的 BE 将使用max_send_batch_parallelism_per_job的值。
- 
load_to_single_tablet布尔类型,为 true 表示支持一个任务只导入数据到对应分区的一个 tablet,默认值为 false,作业的任务数取决于整体并发度。该参数只允许在对带有 random 分桶的 olap 表导数的时候设置。 
- 
priority 
 设置导入任务的优先级,可选 HIGH/NORMAL/LOW三种优先级,默认为NORMAL,对于处在PENDING状态的导入任务,更高优先级的任务将优先被执行进入LOADING状态。
- 
 
- 
- 
comment 指定导入任务的备注信息。可选参数。 
该功能自 Apache Doris 1.2.3 版本起支持
举例
- 
从 HDFS 导入一批数据 LOAD LABEL example_db.label1
 (
 DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file.txt")
 INTO TABLE `my_table`
 COLUMNS TERMINATED BY ","
 )
 WITH BROKER hdfs
 (
 "username"="hdfs_user",
 "password"="hdfs_password"
 );导入文件 file.txt,按逗号分隔,导入到表my_table。
- 
从 HDFS 导入数据,使用通配符匹配两批文件。分别导入到两个表中。 LOAD LABEL example_db.label2
 (
 DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-10*")
 INTO TABLE `my_table1`
 PARTITION (p1)
 COLUMNS TERMINATED BY ","
 (k1, tmp_k2, tmp_k3)
 SET (
 k2 = tmp_k2 + 1,
 k3 = tmp_k3 + 1
 )
 DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-20*")
 INTO TABLE `my_table2`
 COLUMNS TERMINATED BY ","
 (k1, k2, k3)
 )
 WITH BROKER hdfs
 (
 "username"="hdfs_user",
 "password"="hdfs_password"
 );使用通配符匹配导入两批文件 file-10*和file-20*。分别导入到my_table1和my_table2两张表中。其中my_table1指定导入到分区p1中,并且将导入源文件中第二列和第三列的值 +1 后导入。
- 
从 HDFS 导入一批数据。 LOAD LABEL example_db.label3
 (
 DATA INFILE("hdfs://hdfs_host:hdfs_port/user/doris/data/*/*")
 INTO TABLE `my_table`
 COLUMNS TERMINATED BY "\\x01"
 )
 WITH BROKER my_hdfs_broker
 (
 "username" = "",
 "password" = "",
 "fs.defaultFS" = "hdfs://my_ha",
 "dfs.nameservices" = "my_ha",
 "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
 "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
 "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
 "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
 );指定分隔符为 Hive 的默认分隔符 \\x01,并使用通配符 * 指定data目录下所有目录的所有文件。使用简单认证,同时配置 namenode HA。
- 
导入 Parquet 格式数据,指定 FORMAT 为 parquet。默认是通过文件后缀判断 LOAD LABEL example_db.label4
 (
 DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file")
 INTO TABLE `my_table`
 FORMAT AS "parquet"
 (k1, k2, k3)
 )
 WITH BROKER hdfs
 (
 "username"="hdfs_user",
 "password"="hdfs_password"
 );
- 
导入数据,并提取文件路径中的分区字段 LOAD LABEL example_db.label10
 (
 DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*")
 INTO TABLE `my_table`
 FORMAT AS "csv"
 (k1, k2, k3)
 COLUMNS FROM PATH AS (city, utc_date)
 )
 WITH BROKER hdfs
 (
 "username"="hdfs_user",
 "password"="hdfs_password"
 );my_table表中的列为k1, k2, k3, city, utc_date。其中 hdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city=beijing目录下包括如下文件:hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv
 hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv
 hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv
 hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv文件中只包含 k1, k2, k3三列数据,city, utc_date这两列数据会从文件路径中提取。
- 
对待导入数据进行过滤。 LOAD LABEL example_db.label6
 (
 DATA INFILE("hdfs://host:port/input/file")
 INTO TABLE `my_table`
 (k1, k2, k3)
 SET (
 k2 = k2 + 1
 )
 PRECEDING FILTER k1 = 1
 WHERE k1 > k2
 )
 WITH BROKER hdfs
 (
 "username"="user",
 "password"="pass"
 );只有原始数据中,k1 = 1,并且转换后,k1 > k2 的行才会被导入。 
- 
导入数据,提取文件路径中的时间分区字段,并且时间包含 %3A (在 hdfs 路径中,不允许有 ':',所有 ':' 会由 %3A 替换) LOAD LABEL example_db.label7
 (
 DATA INFILE("hdfs://host:port/user/data/*/test.txt")
 INTO TABLE `tbl12`
 COLUMNS TERMINATED BY ","
 (k2,k3)
 COLUMNS FROM PATH AS (data_time)
 SET (
 data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s')
 )
 )
 WITH BROKER hdfs
 (
 "username"="user",
 "password"="pass"
 );路径下有如下文件: /user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
 /user/data/data_time=2020-02-18 00%3A00%3A00/test.txt表结构为: data_time DATETIME,
 k2 INT,
 k3 INT
- 
从 HDFS 导入一批数据,指定超时时间和过滤比例。使用明文 my_hdfs_broker 的 broker。简单认证。并且将原有数据中与 导入数据中 v2 大于 100 的列相匹配的列删除,其他列正常导入 LOAD LABEL example_db.label8
 (
 MERGE DATA INFILE("HDFS://test:802/input/file")
 INTO TABLE `my_table`
 (k1, k2, k3, v2, v1)
 DELETE ON v2 > 100
 )
 WITH HDFS
 (
 "hadoop.username"="user",
 "password"="pass"
 )
 PROPERTIES
 (
 "timeout" = "3600",
 "max_filter_ratio" = "0.1"
 );使用 MERGE 方式导入。 my_table必须是一张 Unique Key 的表。当导入数据中的 v2 列的值大于 100 时,该行会被认为是一个删除行。导入任务的超时时间是 3600 秒,并且允许错误率在 10% 以内。 
- 
导入时指定 source_sequence 列,保证 UNIQUE_KEYS 表中的替换顺序: LOAD LABEL example_db.label9
 (
 DATA INFILE("HDFS://test:802/input/file")
 INTO TABLE `my_table`
 COLUMNS TERMINATED BY ","
 (k1,k2,source_sequence,v1,v2)
 ORDER BY source_sequence
 )
 WITH HDFS
 (
 "hadoop.username"="user",
 "password"="pass"
 )my_table必须是 Unique Key 模型表,并且指定了 Sequcence Col。数据会按照源数据中source_sequence列的值来保证顺序性。
- 
从 HDFS 导入一批数据,指定文件格式为 json并指定json_root、jsonpathsLOAD LABEL example_db.label10
 (
 DATA INFILE("HDFS://test:port/input/file.json")
 INTO TABLE `my_table`
 FORMAT AS "json"
 PROPERTIES(
 "json_root" = "$.item",
 "jsonpaths" = "[$.id, $.city, $.code]"
 )
 )
 with HDFS (
 "hadoop.username" = "user"
 "password" = ""
 )
 PROPERTIES
 (
 "timeout"="1200",
 "max_filter_ratio"="0.1"
 );jsonpaths可与column list及SET (column_mapping)配合:LOAD LABEL example_db.label10
 (
 DATA INFILE("HDFS://test:port/input/file.json")
 INTO TABLE `my_table`
 FORMAT AS "json"
 (id, code, city)
 SET (id = id * 10)
 PROPERTIES(
 "json_root" = "$.item",
 "jsonpaths" = "[$.id, $.code, $.city]"
 )
 )
 with HDFS (
 "hadoop.username" = "user"
 "password" = ""
 )
 PROPERTIES
 (
 "timeout"="1200",
 "max_filter_ratio"="0.1"
 );
- 
从腾讯云 cos 中以 csv 格式导入数据。 LOAD LABEL example_db.label10
 (
 DATA INFILE("cosn://my_bucket/input/file.csv")
 INTO TABLE `my_table`
 (k1, k2, k3)
 )
 WITH BROKER "broker_name"
 (
 "fs.cosn.userinfo.secretId" = "xxx",
 "fs.cosn.userinfo.secretKey" = "xxxx",
 "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
 )
- 
导入 CSV 数据时去掉双引号,并跳过前 5 行。 LOAD LABEL example_db.label12
 (
 DATA INFILE("cosn://my_bucket/input/file.csv")
 INTO TABLE `my_table`
 (k1, k2, k3)
 PROPERTIES("trim_double_quotes" = "true", "skip_lines" = "5")
 )
 WITH BROKER "broker_name"
 (
 "fs.cosn.userinfo.secretId" = "xxx",
 "fs.cosn.userinfo.secretKey" = "xxxx",
 "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
 )
Keywords
BROKER, LOAD
Best Practice
- 
查看导入任务状态 Broker Load 是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过 SHOW LOAD 命令查看。 
- 
取消导入任务 已提交切尚未结束的导入任务可以通过 CANCEL LOAD 命令取消。取消后,已写入的数据也会回滚,不会生效。 
- 
Label、导入事务、多表原子性 Doris 中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,Doris 还可以通过 Label 的机制来保证数据导入的不丢不重。具体说明可以参阅 导入事务和原子性 文档。 
- 
列映射、衍生列和过滤 Doris 可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数和 UDF。关于如何正确的使用这个功能,可参阅 列的映射,转换与过滤 文档。 
- 
错误数据过滤 Doris 的导入任务可以容忍一部分格式错误的数据。容忍了通过 max_filter_ratio设置。默认为 0,即表示当有一条错误数据时,整个导入任务将会失败。如果用户希望忽略部分有问题的数据行,可以将次参数设置为 0~1 之间的数值,Doris 会自动跳过哪些数据格式不正确的行。关于容忍率的一些计算方式,可以参阅 列的映射,转换与过滤 文档。 
- 
严格模式 strict_mode属性用于设置导入任务是否运行在严格模式下。该格式会对列映射、转换和过滤的结果产生影响。关于严格模式的具体说明,可参阅 严格模式 文档。
- 
超时时间 Broker Load 的默认超时时间为 4 小时。从任务提交开始算起。如果在超时时间内没有完成,则任务会失败。 
- 
数据量和任务数限制 Broker Load 适合在一个导入任务中导入 100GB 以内的数据。虽然理论上在一个导入任务中导入的数据量没有上限。但是提交过大的导入会导致运行时间较长,并且失败后重试的代价也会增加。 同时受限于集群规模,我们限制了导入的最大数据量为 ComputeNode 节点数 * 3GB。以保证系统资源的合理利用。如果有大数据量需要导入,建议分成多个导入任务提交。 Doris 同时会限制集群内同时运行的导入任务数量,通常在 3-10 个不等。之后提交的导入作业会排队等待。队列最大长度为 100。之后的提交会直接拒绝。注意排队时间也被计算到了作业总时间中。如果超时,则作业会被取消。所以建议通过监控作业运行状态来合理控制作业提交频率。