Skip to main content

Migrating Data from Other OLTP

There are various ways to migrate data from other TP systems, such as MySQL/SqlServer/Oracle, to Doris.


Use the Catalog to map as an external table, and then use the INSERT INTO or CREATE-TABLE-AS-SELECT statements to complete the data load.

For example, with MySQL:

CREATE CATALOG mysql_catalog properties(
'type' = 'jdbc',
'user' = 'root',
'password' = '123456',
'jdbc_url' = 'jdbc:mysql://host:3306/mysql_db',
'driver_url' = 'mysql-connector-java-8.0.25.jar',
'driver_class' = 'com.mysql.cj.jdbc.Driver'

-- Load via INSERT
INSERT INTO internal.doris_db.tbl1
SELECT * FROM iceberg_catalog.iceberg_db.table1;

-- Load via CTAS
CREATE TABLE internal.doris_db.tbl1
PROPERTIES('replication_num' = '1')
SELECT * FROM iceberg_catalog.iceberg_db.table1;

For more details, refer to Catalog Data Load。

You can leverage Flink to achieve offline and real-time synchronization for TP systems.

  • Offline synchronization can be done using Flink's JDBC Source and Doris Sink to complete the data load. For example, using FlinkSQL:

    CREATE TABLE student_source (
    id INT,
    name STRING,
    age INT
    ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/mydatabase',
    'table-name' = 'students',
    'username' = 'username',
    'password' = 'password',

    CREATE TABLE student_sink (
    id INT,
    name STRING,
    age INT
    WITH (
    'connector' = 'doris',
    'fenodes' = '',
    'table.identifier' = 'test.students',
    'username' = 'root',
    'password' = 'password',
    'sink.label-prefix' = 'doris_label'

    INSERT into student_sink select * from student_source;

    For more details, refer to Flink JDBC。

  • Real-time synchronization can be achieved using FlinkCDC to read both full and incremental data. For example, using FlinkSQL:

    SET 'execution.checkpointing.interval' = '10s';

    CREATE TABLE cdc_mysql_source (
    id int
    ,name VARCHAR
    ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '',
    'port' = '3306',
    'username' = 'root',
    'password' = 'password',
    'database-name' = 'database',
    'table-name' = 'table'

    -- Supports synchronization of insert/update/delete events.
    CREATE TABLE doris_sink (
    id INT,
    name STRING
    WITH (
    'connector' = 'doris',
    'fenodes' = '',
    'table.identifier' = 'database.table',
    'username' = 'root',
    'password' = '',
    '' = 'json',
    '' = 'true',
    'sink.enable-delete' = 'true', -- Synchronize delete events.
    'sink.label-prefix' = 'doris_label'

    insert into doris_sink select id,name from cdc_mysql_source;

    For synchronizing an entire database or multiple tables in a TP database, you can use the full-database synchronization feature provided by the Flink Doris Connector to complete the TP database write with a single click, as shown below:

    <FLINK_HOME>bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c \
    lib/flink-doris-connector-1.16-24.0.1.jar \
    mysql-sync-database \
    --database test_db \
    --mysql-conf hostname= \
    --mysql-conf port=3306 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=mysql_db \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes= \
    --sink-conf username=root \
    --sink-conf password=123456 \
    --sink-conf jdbc-url=jdbc:mysql:// \
    --sink-conf sink.label-prefix=label \
    --table-conf replication_num=1

    For more details, refer to Full Database Synchronization

Spark Connector​

You can use the JDBC Source and Doris Sink of the Spark Connector to complete the data write.

val jdbcDF =
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")

.option("doris.table.identifier", "db.table")
.option("doris.fenodes", "")
.option("user", "root")
.option("password", "")

For more details, refer to JDBC To Other Databases,Spark Doris Connector

DataX / Seatunnel / CloudCanal and other third-party tools.​

In addition, you can also use third-party synchronization tools for data synchronization. For more details, please refer to: