Skip to main content

CREATE STREAMING JOB

Description

Doris Streaming Job is a continuous import task based on the Job approach. After the Job is submitted, Doris continuously runs the import job, querying TVF or upstream data sources in real time and writing the data into Doris tables.

Syntax

CREATE JOB <job_name>
ON STREAMING
[ PROPERTIES (
<job_property>
[ , ... ]
)
]
[ COMMENT <comment> ]
(
DO <Insert_Command>
|
(
FROM <sourceType> (
<source_property>
[ , ... ])
TO DATABASE <target_db>
[ PROPERTIES (
<target_property>
[ , ... ])
]
)

Required Parameters

1. <job_name>

The job name, which uniquely identifies an event in a database. The job name must be globally unique; if a job with the same name already exists, an error will be reported.

2. <Insert_Command>

The DO clause specifies the operation to be executed when the job is triggered, i.e., an SQL statement. Currently, only S3 TVF is supported.

3. <sourceType>

Supported data sources: currently only MySQL and Postgres.

4. <source_property>

ParameterDefaultDescription
jdbc_url-JDBC connection string (MySQL/PG)
driver_url-JDBC driver jar path
driver_class-JDBC driver class name
user-Database username
password-Database password
database-Database name
schema-Schema name
include_tables-Tables to synchronize, comma separated
offsetinitialinitial: full + incremental sync, latest: incremental only
snapshot_split_size8096The size (in number of rows) of each split. During full synchronization, a table will be divided into multiple splits for synchronization.
snapshot_parallelism1The parallelism level during the full synchronization phase, i.e., the maximum number of splits a single task can schedule at once.

5. <target_db>

Doris target database name to import into.

6. <target_property>

ParameterDefaultDescription
table.create.properties.*-Table properties when creating, e.g. replication_num
load.strict_mode-Whether to enable strict mode. Disabled by default.
load.max_filter_ratio-The maximum allowed filtering ratio within a sampling window. Must be between 0 and 1 (inclusive). The default value is 0, indicating zero tolerance. The sampling window equals max_interval * 10. If, within this window, the ratio of erroneous rows to total rows exceeds max_filter_ratio, the scheduled job will be paused and requires manual intervention to address data quality issues.

Optional Parameters

1. <job_property>

ParameterDefaultDescription
session.*NoneSupports configuring all session variables in job_properties
s3.max_batch_files256Triggers an import write when the cumulative number of files reaches this value
s3.max_batch_bytes10GTriggers an import write when the cumulative data volume reaches this value
max_interval10sIdle scheduling interval when there are no new files or data upstream

Privilege Control

The user executing this SQL command must have at least the following privileges:

PrivilegeObjectNotes
LOAD_PRIVDatabaseCurrently, only the LOAD privilege is supported for this operation

Notes

  • TASK only retains the latest 100 records.
  • Currently, Insert_Command only supports INSERT internal table Select * From S3(...); more operations will be supported in the future.

Examples

  • Create a job named my_job that continuously monitors files in a specified directory on S3 and imports data from files ending in .csv into db1.tbl1.

    CREATE JOB my_job
    ON STREAMING
    DO
    INSERT INTO db1.`tbl1`
    SELECT * FROM S3
    (
    "uri" = "s3://bucket/s3/demo/*.csv",
    "format" = "csv",
    "column_separator" = ",",
    "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com",
    "s3.region" = "ap-southeast-1",
    "s3.access_key" = "",
    "s3.secret_key" = ""
    );
  • Create a job named multi_table_sync to synchronize user_info and order_info tables from MySQL upstream to the target_test_db database from the beginning.

    CREATE JOB multi_table_sync
    ON STREAMING
    FROM MYSQL (
    "jdbc_url" = "jdbc:mysql://127.0.0.1:3306",
    "driver_url" = "mysql-connector-j-8.0.31.jar",
    "driver_class" = "com.mysql.cj.jdbc.Driver",
    "user" = "root",
    "password" = "123456",
    "database" = "test",
    "include_tables" = "user_info,order_info",
    "offset" = "initial"
    )
    TO DATABASE target_test_db (
    "table.create.properties.replication_num" = "1"
    )
  • Create a job named test_postgres_job to continuously synchronize incremental data from the test_tbls table in Postgres upstream to the target_test_db database.

    CREATE JOB test_postgres_job
    ON STREAMING
    FROM POSTGRES (
    "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
    "driver_url" = "postgresql-42.5.0.jar",
    "driver_class" = "org.postgresql.Driver",
    "user" = "postgres",
    "password" = "postgres",
    "database" = "postgres",
    "schema" = "public",
    "include_tables" = "test_tbls",
    "offset" = "latest"
    )
    TO DATABASE target_test_db (
    "table.create.properties.replication_num" = "1"
    )

CONFIG

fe.conf

ParameterDefaultDescription
max_streaming_job_num1024Maximum number of Streaming jobs
job_streaming_task_exec_thread_num10Number of threads for StreamingTask
max_streaming_task_show_count100Max number of StreamingTask records in memory