Multi-stream Update for Unique Model
Overview
To ensure concurrent conflict resolution for replace operations, Doris's unique table provides the functionality of updating based on the sequence column. That is, under the same key column, columns of the REPLACE aggregation type will be replaced according to the value of the sequence column. A larger value can replace a smaller one, but not vice versa. However, in some business scenarios, businesses need to update different columns in the same wide table through two or more data streams. For example, one data stream writes in real-time to update some fields of the table; another data stream performs imports on demand to update other columns of the table. During the update, both stream jobs need to ensure the order of replace operations; moreover, during queries, data from all columns should be accessible for querying.
Sequence Mapping
To address the above issue, Doris supports the sequence mapping feature. This feature resolves the problem of concurrent updates from multiple streams by specifying the mapping relationship between the columns to be updated and their corresponding sequence columns.
| A | B | C | D | E | s1 | s2 |
|---|
Assuming the above table is all columns of a unique table, where AB are the key columns and CDE are the value columns. "ABCD" represents data generated by one data stream, and "ABE" represents data generated by another data stream. Both streams need to write to the same table. However, the timing of data generation and updates for ABCD and ABE is not synchronized (with intervals potentially being very long), making it impractical (or requiring significant cost) to concatenate all column data before writing.
We additionally introduce two columns, s1 and s2, as sequence columns to control the updates of data from the two streams. s1 is used for version control of data in columns C and D; s2 is used for version control of data in column E. When importing or performing other update operations, the data from the two streams do not interfere with each other. Each stream completes its update operations based on its own sequence columns.
Usage Example
1. Creating a table supporting sequence mapping
Create a table that supports sequence mapping, and specify that the updates of columns c and d depend on the sequence column s1, while the update of column e depends on the sequence column s2. Sequence columns can be of integer type and time type (DATE, DATETIME), and the type of the column cannot be changed after creation.
CREATE TABLE `upsert_test` (
`a` bigint(20) NULL COMMENT "",
`b` int(11) NULL COMMENT "",
`c` int(11) NULL COMMENT "",
`d` int(11) NULL COMMENT "",
`e` int(11) NULL COMMENT "",
`s1` int(11) NULL COMMENT "",
`s2` int(11) NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`a`, `b`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`a`, `b`) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write"="false",
"light_schema_change"="true",
"replication_num" = "1",
"sequence_mapping.s1" = "c,d",
"sequence_mapping.s2" = "e"
);
The table structure is as follows:
MySQL > desc upsert_test;
+-------+--------+------+-------+---------+---------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------+------+-------+---------+---------+
| a | bigint | Yes | true | NULL | |
| b | int | Yes | true | NULL | |
| c | int | Yes | false | NULL | REPLACE |
| d | int | Yes | false | NULL | REPLACE |
| e | int | Yes | false | NULL | REPLACE |
| s1 | int | Yes | false | NULL | REPLACE |
| s2 | int | Yes | false | NULL | REPLACE |
+-------+--------+------+-------+---------+---------+
2. Insert & Query Data
MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,2,2,2);
Query OK, 1 row affected (0.080 sec)
MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | e | s1 | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 2 | 2 | NULL | 2 | NULL |
+------+------+------+------+------+------+------+
1 row in set (0.049 sec)
MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,1,1,1);
Query OK, 1 row affected (0.048 sec)
MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | e | s1 | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 2 | 2 | NULL | 2 | NULL |
+------+------+------+------+------+------+------+
1 row in set (0.021 sec)
MySQL > insert into upsert_test(a, b, e, s2) values (1,1,2,2);
Query OK, 1 row affected (0.043 sec)
MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | e | s1 | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 2 | 2 | 2 | 2 | 2 |
+------+------+------+------+------+------+------+
1 row in set (0.019 sec)
MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,3,3,3);
Query OK, 1 row affected (0.049 sec)
MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | e | s1 | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 3 | 3 | 2 | 3 | 2 |
+------+------+------+------+------+------+------+
1 row in set (0.019 sec)
MySQL > insert into upsert_test(a, b, c, d, s1,e,s2) values(1,1,5,5,4,5,4);
Query OK, 1 row affected (0.050 sec)
MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | e | s1 | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 5 | 5 | 5 | 4 | 4 |
+------+------+------+------+------+------+------+
1 row in set (0.019 sec)
During the first insertion, since e and s2 are not written, the values read for e and s2 will be null.
During the second insertion, as the value of s1 is smaller than that written in the first insertion, the values of c, d, and s1 will remain unchanged.
During the third insertion, when values for e and s2 are written, all columns will have correct values.
During the fourth insertion, because the value of s1 is greater than the previously written value, c, d, and s1 will all be updated.
During the fifth insertion, since both s1 and s2 values are greater than the previously written values, c, d, s1, e, and s2 will all be updated.
3. Add/Drop Column
CREATE TABLE `upsert_test` (
`a` bigint(20) NULL COMMENT "",
`b` int(11) NULL COMMENT "",
`c` int(11) NULL COMMENT "",
`d` int(11) NULL COMMENT "",
`s1` int(11) NULL COMMENT "",
) ENGINE=OLAP
UNIQUE KEY(`a`, `b`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`a`, `b`) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "false",
"light_schema_change"="true",
"replication_num" = "1",
"sequence_mapping.s1" = "c,d"
);
MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,1,1,1),(1,1,3,3,3),(1,1,2,2,2);
Query OK, 3 rows affected (0.101 sec)
MySQL > select * from upsert_test;
+------+------+------+------+------+
| a | b | c | d | s1 |
+------+------+------+------+------+
| 1 | 1 | 3 | 3 | 3 |
+------+------+------+------+------+
1 row in set (0.057 sec)
MySQL > alter table upsert_test add column (e int(11) NULL, s2 bigint) PROPERTIES('sequence_mapping.s2' = 'e');
Query OK, 0 rows affected (0.011 sec)
MySQL > desc upsert_test;
+-------+--------+------+-------+---------+---------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------+------+-------+---------+---------+
| a | bigint | Yes | true | NULL | |
| b | int | Yes | true | NULL | |
| c | int | Yes | false | NULL | REPLACE |
| d | int | Yes | false | NULL | REPLACE |
| s1 | int | Yes | false | NULL | REPLACE |
| e | int | Yes | false | NULL | REPLACE |
| s2 | bigint | Yes | false | NULL | REPLACE |
+-------+--------+------+-------+---------+---------+
7 rows in set (0.003 sec)
MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | s1 | e | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 3 | 3 | 3 | NULL | NULL |
+------+------+------+------+------+------+------+
1 row in set (0.032 sec)
MySQL > insert into upsert_test(a, b, e, s2) values (1,1,2,2);
Query OK, 1 row affected (0.052 sec)
MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | s1 | e | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 3 | 3 | 3 | 2 | 2 |
+------+------+------+------+------+------+------+
1 row in set (0.020 sec)
MySQL > insert into upsert_test(a, b, c, d, s1,e,s2) values(1,1,5,5,4,5,4);
Query OK, 1 row affected (0.050 sec)
MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | s1 | e | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 5 | 5 | 4 | 5 | 4 |
+------+------+------+------+------+------+------+
1 row in set (0.022 sec)
MySQL > alter table upsert_test drop column e;
Query OK, 0 rows affected (0.006 sec)
MySQL > select * from upsert_test;
+------+------+------+------+------+------+
| a | b | c | d | s1 | s2 |
+------+------+------+------+------+------+
| 1 | 1 | 5 | 5 | 4 | 4 |
+------+------+------+------+------+------+
1 row in set (0.026 sec)
MySQL > alter table upsert_test drop column s2;
Query OK, 0 rows affected (0.005 sec)
MySQL > select * from upsert_test;
+------+------+------+------+------+
| a | b | c | d | s1 |
+------+------+------+------+------+
| 1 | 1 | 5 | 5 | 4 |
+------+------+------+------+------+
1 row in set (0.014 sec)
Note
-
Light schema change must be enabled, and renaming columns is not currently supported
-
Sequence columns can be of integer type and time type (DATE, DATETIME), and the type of the column cannot be changed after creation.
-
There must be no overlap between all mapped columns. For example, column d in the sample cannot be mapped to both s1 and s2.
-
The sequence column and mapping column cannot be key columns, and all non-key columns must be mapped to a sequence column
-
The mapping relationship cannot be changed. For example, column d in the sample, which has already been mapped to column s1, cannot be modified to map to column s2.
-
Currently only supports MOR tables, does not support enabling simultaneously with sequence columns, and does not support batch deletion operations
-
Creating RollUp is not currently supported
-
If the sequence_mapping attribute is not included when creating a new table, it will not be supported to open it later