跳到主要内容

CREATE STREAMING JOB

描述

Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。

语法

CREATE JOB <job_name>
ON STREAMING
[ PROPERTIES (
<job_property>
[ , ... ]
)
]
[ COMMENT <comment> ]
(
DO <Insert_Command>
|
(
FROM <sourceType> (
<source_property>
[ , ... ])
TO DATABASE <target_db>
[ PROPERTIES (
<target_property>
-- Other属性
[ , ... ])
]
)

必选参数

1. <job_name>

作业名称,它在一个 db 中标识唯一事件。JOB 名称必须是全局唯一的,如果已经存在同名的 JOB,则会报错。

2. <Insert_Command>

DO 子句,它指定了 Job 作业触发时需要执行的操作,即一条 SQL 语句,目前只支持 S3 TVF。

3. <sourceType>

支持的数据源,目前只支持 MySQL 和 Postgres。

4. <source_property>

参数默认值说明
jdbc_url-JDBC 连接串(MySQL/PG)
driver_url-JDBC 驱动 jar 包路径
driver_class-JDBC 驱动类名
user-数据库用户名
password-数据库密码
database-数据库名
schema-schema 名称
include_tables-需要同步的表名,多个表用逗号分隔
offsetinitialinitial: 全量 + 增量同步,latest: 仅增量同步
snapshot_split_size8096split 的大小 (行数),全量同步时,表会被切分成多个 split 进行同步
snapshot_parallelism1全量阶段同步的并行度,即单次 Task 最多调度的 split 数量

5. <target_db>

需要导入的 Doris 目标库名称。

6. <target_property>

参数默认值说明
table.create.properties.*-支持创建表的时候指定 table 的 properties,比如 replication_num
load.strict_mode-是否开启严格模式,默认为关闭
load.max_filter_ratio-采样窗口内,允许的最大过滤率。必须在大于等于 0 到小于等于 1 之间。默认值是 0,表示零容忍。采样窗口为 max_interval * 10。即如果在采样窗口内,错误行数/总行数大于 max_filter_ratio,则会导致例行作业被暂停,需要人工介入检查数据质量问题。

可选参数

1. <job_property>

参数默认值说明
session.*支持在 job_properties 上配置所有的 session 变量
s3.max_batch_files256当累计文件数达到该值时触发一次导入写入
s3.max_batch_bytes10G当累计数据量达到该值时触发一次导入写入
max_interval10s当上游没有新增文件或数据时,空闲的调度间隔。

权限控制

执行此 SQL 命令的用户必须至少具有以下权限:

权限(Privilege)对象(Object)说明(Notes)
LOAD_PRIV数据库(DB)目前仅支持 LOAD 权限执行此操作

注意事项

  • TASK 只保留最新的 100 条记录。

  • 目前 Insert_Command 仅支持 INSERT 内表 Select * From S3(...) 操作,后续会支持更多的操作。

示例

  • 创建一个名为 my_job 的作业,持续监听 S3 上的指定目录的文件,执行的操作是将 csv 结尾的文件中的数据导入到 db1.tbl1 中。

    CREATE JOB my_job
    ON STREAMING
    DO
    INSERT INTO db1.`tbl1`
    SELECT * FROM S3
    (
    "uri" = "s3://bucket/s3/demo/*.csv",
    "format" = "csv",
    "column_separator" = ",",
    "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com",
    "s3.region" = "ap-southeast-1",
    "s3.access_key" = "",
    "s3.secret_key" = ""
    );
  • 创建一个名为 my_job 的作业,从头开始同步 MySQL 上游的 user_info,order_info 表的数据,到 target_test_db 库下面。

    CREATE JOB multi_table_sync
    ON STREAMING
    FROM MYSQL (
    "jdbc_url" = "jdbc:mysql://127.0.0.1:3306",
    "driver_url" = "mysql-connector-j-8.0.31.jar",
    "driver_class" = "com.mysql.cj.jdbc.Driver",
    "user" = "root",
    "password" = "123456",
    "database" = "test",
    "include_tables" = "user_info,order_info",
    "offset" = "initial"
    )
    TO DATABASE target_test_db (
    "table.create.properties.replication_num" = "1"
    )
  • 创建一个名为 my_job 的作业,持续同步 Postgres 上游的 test_tbls 表的增量的数据,到 target_test_db 库下面。

    CREATE JOB test_postgres_job
    ON STREAMING
    FROM POSTGRES (
    "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
    "driver_url" = "postgresql-42.5.0.jar",
    "driver_class" = "org.postgresql.Driver",
    "user" = "postgres",
    "password" = "postgres",
    "database" = "postgres",
    "schema" = "public",
    "include_tables" = "test_tbls",
    "offset" = "latest"
    )
    TO DATABASE target_test_db (
    "table.create.properties.replication_num" = "1"
    )

CONFIG

fe.conf

参数默认值
max_streaming_job_num1024最大的 Streaming 作业数量
job_streaming_task_exec_thread_num10用于执行 StreamingTask 的线程数
max_streaming_task_show_count100StreamingTask 在内存中最多保留的 task 执行记录