Routine Load导入原理及最佳实践
1. 概述
Routine Load 用于持续消费 Kafka 数据并写入 Apache Doris。用户可以通过创建 Routine Load Job,来自动订阅指定的 Kafka Topic。其核心特性包括:
- 
高可用:支持 7×24 小时不间断消费 Kafka 数据,且故障恢复后可自动恢复运行。 
- 
低延迟:Kafka 消息可实现秒级可见。 
- 
Exactly Once 语义:确保消费 Kafka 数据不丢不重,实现精确一次消费。 
本文将深入解析实现原理,给出典型场景的最佳实践,并提供常见问题的排查思路,帮助用户快速上手并高效运维。
2. 实现原理
Kafka 数据以流形式存在,Doris 以“微批”(micro-batch)方式消费 Kafka 流式数据。创建 routine load job 后,系统会根据配置的并发度将其拆分为多个 task 并发执行,每个 task 负责消费 Kafka topic 中特定 partition 的数据。每个 task 对应一个事务,执行完成后会生成新的 task 继续消费下一批数据。下文从 job/task 调度、Exactly Once 语义实现及一流多表三个角度进行说明。
2.1 作业(Job)与任务(Task)调度
Routine Load 采用两级调度:
- 
Job 调度: 负责任务拆分、故障恢复与生命周期管理。 
- 
Task 调度: 负责将具体的数据拉取、转换与写入操作的任务分发到 BE 节点执行。 
2.1.1 Job调度
Job 状态机:
| 状态 | 含义 | 
|---|---|
| NEED_SCHEDULE | 等待首次调度或需要重新调度 | 
| RUNNING | 正常消费中 | 
| PAUSED | 主动或异常暂停,可自动恢复 | 
| CANCELLED | 因库/表被删除等不可恢复错误而终止 | 
| STOPPED | 手动停止且不可恢复 | 
根据job状态不同,调度线程每周期(10s)执行以下动作:
- NEED_SCHEDULE: 获取 topic 元数据(partition 数、起始 offset)按
taskNum = min(topic_partition_num,
              desired_concurrent_number,
              max_routine_load_task_concurrent_num)
拆分 task,并将 task 放入 needScheduleTasksQueue等待task调度线程开始调度。
- 
RUNNING: 周期获取 topic 元数据,若 partition 数变化立即重调度。 
- 
PAUSED: 为了确保作业的高可用性,引入了auto-resume机制。在非预期暂停的情况下,Routine Load Scheduler 调度线程会尝试自动恢复作业。对于 Kafka 侧的意外宕机或其他无法工作的情况,自动恢复机制可以确保在 Kafka 恢复后,无需人工干预,导入作业能够继续正常运行。需要注意的是,存在三种不会自动恢复的情况: - 用户手动执行 PAUSE ROUTINE LOAD 命令。
- 数据质量存在问题。
- 无法自动恢复的情况,例如库表被删除。
 除了上述三种情况,其他暂停状态的作业都会尝试自动恢复。 
- 
CANCELLED / STOPPED: 延迟回收资源。 
2.1.2 Task调度
调度条件
- 
task 未读到 partition 末尾,即仍然还有数据可以消费,以避免无效占用资源。 
- 
若上一次已读到 EOF,则距上次开始执行必须超过 max_batch_interval才会发起新一轮调度,目的是在消费速度大于生产速度条件下适当攒批,防止生成太多的小事务。
负载均衡策略
- 优先选择当前运行 Task 数量最少的 BE。
- 若多个 BE 的 Task 数相同,则优先复用已缓存 Kafka Consumer 的节点,以减少初始化开销。
批边界
任一条件满足即结束当前 task:
- 
达到 max_batch_interval定义的时间。
- 
达到 max_batch_rows定义的行数。
- 
达到 max_batch_size定义的bytes大小。
- 
读取到 Kafka EOF,即消费到流末尾。 
Task 结束后提交事务,并立即生成新 task 放入队列等待下一次调度,实现持续消费。
2.2 Exactly-Once 语义
Routine Load 通过 “持久化消费进度” + “提交校验” 双重机制,确保 Kafka 数据不丢不重。
2.2.1 持久化消费进度
每个 task 在事务提交时,将消费进度(progress)随事务信息一起写入 FE 的 edit log,利用 Berkeley DB JE 同步给所有 FE Follower。Master 切换/重启后,进度信息依旧准确。
2.2.2 提交校验
当 Job 因手动暂停、切主或 topic 元数据变化被重调度时,可能短暂出现 两个 task 并发消费同一 partition 的场景。为防止重复写入:
- 
每个 Job 在内存中维护 routineLoadTaskInfoList。
- 
task 提交前会校验自己是否仍在 routineLoadTaskInfoList中,否则拒绝提交。
2.3 一流多表写入
一流多表用于单个 Routine Load Job 同时写入多张目标表,核心流程如下:
- 
规划阶段:由于目标表无法在创建 Job 时完全确定,执行计划会被延迟到运行时,由 BE 动态向 FE Master 获取。 
- 
数据缓存:BE 先将数据缓存在本地的 multi-table pipe。如果缓存至 200 条记录,或5 张尚未请求执行计划的新表,就会发起执行计划请求并执行,防止数据积压。 
- 
执行计划复用:同一事务内会复用已缓存的执行计划,事务间重新请求,保障元信息实时性。 
3. 最佳实践
Routine Load 默认参数已满足绝大多数场景。以下三种情况需要手动调优:
| 场景 | 推荐修改参数 | 
|---|---|
| 低延迟需求 | 将 max_batch_interval由 默认60 s 调小 | 
| 小数据量、资源敏感 | 将 desired_concurrent_number调小 | 
| 高吞吐 | 将 max_batch_interval由 默认60 s 调大至120-180s | 
4. 常见问题排查
4.1 数据堆积
- 通过SHOW ROUTINE LOAD\G查看任务状态:
- 
State 是否为 RUNNING,如果为其他状态,可查看ReasonOfStateChanged字段了解原因。
- 
OtherMsg 是否有报错信息。 
- 
通过 BE 日志判断是否已触达吞吐上限 搜索 consumer group done日志,其中的left_time / left_rows / left_bytes会显示最先触发的阈值,进而针对性调大max_batch_size或max_batch_rows:consumer group done: 894fc32d5b9d3e93-7387a02da6dafd88. consume time(ms)=34004, received rows=2679540, received bytes=2147484043, eos: 0, left_time: 25996, left_rows: 17320460, left_bytes: -395, blocking get time(us): 949236, blocking put time(us): 28730419, id=69616a41fc064f1e-a93ff0ddd217f0a0, job_id=48121487, txn_id=61763720, label=ods_hq_market_unique_jobs_0-48121487-69616a41fc064f1e-a93ff0ddd217f0a0-61763720, elapse(s)=34上例中 left_bytes: -395表示 34 秒内就因max_batch_size到达上限而结束批次。此时可适当调大max_batch_size,让单个批次在max_batch_interval内尽量满载,以提升吞吐。
- 
增加并发与吞吐量 
- 
将 desired_concurrent_number提高到与 Topic 的 Partition 数一致。
- 
适度增加 max_batch_interval(如 120 s ~ 180 s)/max_batch_size/max_batch_rows以提升单事务数据量,增加单批次数据量,减少事务开销。
4.2 任务异常暂停
Routine Load 内置自动恢复机制,绝大多数非预期暂停都会重试。若任务持续处于 PAUSED 且无法自动恢复,可执行 SHOW ROUTINE LOAD 并排查:
- 
是否手动执行 PAUSE ROUTINE LOAD。
- 
是否存在数据质量问题(如格式错误、字段缺失)。 
- 
Kafka数据是否已经过期报错 out of range。