Seatunnel Doris Sink
About SeaTunnel
SeaTunnel is a very easy-to-use ultra-high-performance distributed data integration platform that supports real-time synchronization of massive data. It can synchronize tens of billions of data stably and efficiently every day.
Connector-V2
The connector-v2 for SeaTunnel supports Doris Sink since version 2.3.1 and supports exactly-once write and CDC data synchronization
Plugin Code
SeaTunnel Doris Sink Plugin Code
Options
| name | type | required | default value | 
|---|---|---|---|
| fenodes | string | yes | - | 
| username | string | yes | - | 
| password | string | yes | - | 
| table.identifier | string | yes | - | 
| sink.label-prefix | string | yes | - | 
| sink.enable-2pc | bool | no | true | 
| sink.enable-delete | bool | no | false | 
| doris.config | map | yes | - | 
fenodes [string]
Doris cluster FE Nodes address, the format is "fe_ip:fe_http_port, ..."
username [string]
Doris user username
password [string]
Doris`user password
table.identifier [string]
The name of Doris table,The format is DBName.TableName
sink.label-prefix [string]
The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel.
sink.enable-2pc [bool]
Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to here.
sink.enable-delete [bool]
Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this link:
doris.config [map]
The parameter of the stream load data_desc, you can get more detail at this link:
Example
Use JSON format to import data
sink {
    Doris {
        fenodes = "doris_fe:8030"
        username = root
        password = ""
        table.identifier = "test.table_sink"
        sink.enable-2pc = "true"
        sink.label-prefix = "test_json"
        doris.config = {
            format="json"
            read_json_by_line="true"
        }
    }
}
Use CSV format to import data
sink {
    Doris {
        fenodes = "doris_fe:8030"
        username = root
        password = ""
        table.identifier = "test.table_sink"
        sink.enable-2pc = "true"
        sink.label-prefix = "test_csv"
        doris.config = {
          format = "csv"
          column_separator = ","
        }
    }
}
Connector-V1
Flink Sink Doris
Plugin Code
Seatunnel Flink Sink Doris plugin code
Options
| name | type | required | default value | engine | 
|---|---|---|---|---|
| fenodes | string | yes | - | Flink | 
| database | string | yes | - | Flink | 
| table | string | yes | - | Flink | 
| user | string | yes | - | Flink | 
| password | string | yes | - | Flink | 
| batch_size | int | no | 100 | Flink | 
| interval | int | no | 1000 | Flink | 
| max_retries | int | no | 1 | Flink | 
| doris.* | - | no | - | Flink | 
fenodes [string]
Doris Fe http url, eg: 127.0.0.1:8030
database [string]
Doris database
table [string]
Doris table
user [string]
Doris user
password [string]
Doris password
batch_size [int]
The maximum number of lines to write to Doris at a time, the default value is 100
interval [int]
The flush interval (in milliseconds), after which the asynchronous thread writes the data in the cache to Doris. Set to 0 to turn off periodic writes.
max_retries [int]
Number of retries after writing to Doris fails
doris.* [string]
Import parameters for Stream load. For example: 'doris.column_separator' = ', ' etc.
More Stream Load parameter configuration
Examples
Socket To Doris
env {
  execution.parallelism = 1
}
source {
    SocketStream {
      host = 127.0.0.1
      port = 9999
      result_table_name = "socket"
      field_name = "info"
    }
}
transform {
}
sink {
  DorisSink {
      fenodes = "127.0.0.1:8030"
      user = root
      password = 123456
      database = test
      table = test_tbl
      batch_size = 5
      max_retries = 1
      interval = 5000
    }
}
Start command
sh bin/start-seatunnel-flink.sh --config config/flink.streaming.conf
Spark Sink Doris
Plugin Code
Seatunnel Spark Sink Doris plugin code
Options
| name | type | required | default value | engine | 
|---|---|---|---|---|
| fenodes | string | yes | - | Spark | 
| database | string | yes | - | Spark | 
| table | string | yes | - | Spark | 
| user | string | yes | - | Spark | 
| password | string | yes | - | Spark | 
| batch_size | int | yes | 100 | Spark | 
| doris.* | string | no | - | Spark | 
fenodes [string]
Doris FE address:8030
database [string]
Doris target database name
table [string]
Doris target table name
user [string]
Doris user name
password [string]
Doris user's password
batch_size [string]
Doris number of submissions per batch
doris. [string]
Doris stream_load properties,you can use 'doris.' prefix + stream_load properties
More Doris stream_load Configurations
Examples
Hive to Doris
Config properties
env{
  spark.app.name = "hive2doris-template"
}
spark {
  spark.sql.catalogImplementation = "hive"
}
source {
  hive {
    preSql = "select * from tmp.test"
    result_table_name = "test"
  }
}
transform {
}
sink {
Console {
  }
Doris {
   fenodes="xxxx:8030"
   database="gl_mint_dim"
   table="dim_date"
   user="root"
   password="root"
   batch_size=1000
   doris.column_separator="\t"
   doris.columns="date_key,date_value,day_in_year,day_in_month"
   }
}
Start command
sh bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/spark.conf