跳到主要内容

Java UDF, UDAF, UDWF, UDTF

概述

Java UDF 为用户提供使用 Java 编写自定义函数的接口,方便用户通过 Java 语言实现 SQL 中无法直接表达的业务逻辑。Apache Doris 支持使用 Java 编写 UDF、UDAF、UDWF 与 UDTF 四类自定义函数。下文如无特殊说明,使用 UDF 统称所有用户自定义函数。

四类自定义函数的定义与典型代表如下:

类型全称行为说明典型函数示例起始支持版本
UDFScalar Function(标量函数)每输入一行,输出一行结果ABS、LENGTH全版本支持
UDAFAggregate Function(聚合函数)多行输入聚合后,输出一行结果MIN、MAX、COUNT全版本支持
UDWFWindow Function(窗口函数)在窗口范围(一行或多行)内为每行返回一个值ROW_NUMBER、RANK、DENSE_RANK全版本支持
UDTFTable Function(表函数)每输入一行,输出一行或多行;需结合 Lateral View 使用,可实现行转列EXPLODE、EXPLODE_SPLITDoris 3.0 起

对于已经在 Hive 上积累了大量自定义函数的用户,Java UDF 可以直接迁移至 Doris,无需重写。

适用场景

  • 业务需要在 SQL 中执行 Doris 内置函数无法覆盖的标量计算、聚合统计或行展开逻辑。
  • 已有 Hive Java UDF 资产,需要平滑迁移至 Doris。
  • 自定义函数中需要加载较大资源文件(如词典、模型),或希望复用全局连接池等单例资源。

数据类型映射

下表列出了 Doris 数据类型与 Java UDF 入参/返回值类型之间的对应关系:

Doris 数据类型Java UDF 参数类型
BoolBoolean
TinyIntByte
SmallIntShort
IntInteger
BigIntLong
LargeIntBigInteger
FloatFloat
DoubleDouble
DateLocalDate
DatetimeLocalDateTime
IPV4 / IPV6InetAddress
StringString
DecimalBigDecimal
array<Type>ArrayList<Type>List<Type>(支持嵌套)
map<Type1,Type2>HashMap<Type1,Type2>Map<Type1,Type2>(支持嵌套)
struct<Type...>ArrayList<Object>(从 3.0.0 版本开始支持)、List<Type>
VarBinarybyte[]Byte[](从 4.0 版本开始支持 VarBinary 类型,优先建议使用 byte[],可减少一层额外转换)
提示

arraymapstruct 类型可以嵌套其它类型。例如,Doris 中的 array<array<int>> 对应的 Java UDF 参数类型为 ArrayList<ArrayList<Integer>>,其他类型依此类推。List<Type>Map<Type1,Type2> 形式的支持从 3.1.0 版本开始。

注意

在创建函数时,请务必使用 string 类型,而不是 varchar,否则可能导致函数执行失败。

使用限制

  1. 不支持复杂数据类型 HLL 与 Bitmap。

  2. 允许用户自行指定 JVM 最大堆大小,对应配置项为 be.confJAVA_OPTS-Xmx 部分;默认 1024 MB。如果聚合数据量较大,建议适当调大,以提升性能并降低内存溢出风险。

  3. 由于 JVM 加载同名类的限制,不要同时使用多个同名类作为 UDF 实现。如需更新某个同名类的 UDF,需要重启 BE 重新加载 classpath。

  4. 同名函数的处理规则:

    • 用户可以创建与内置函数签名完全相同的自定义函数。默认情况下,系统会优先匹配内置函数。
    • 如果在调用时显式指定了 database(即 db.function()),则会被强制识别为用户自定义函数。
    • 在 3.0.7 版本中新增了会话变量 prefer_udf_over_builtin。当其设置为 true 时,会优先匹配用户自定义函数,便于用户从其他系统迁移到 Doris 时,在不改变函数名称的前提下保持原有系统的函数行为。

快速上手

本节介绍如何开发并注册 Java UDF。在 samples/doris-demo/java-udf-demo/ 目录下提供了示例代码供参考,也可以查看 GitHub 上的 demo

UDF 的使用方式与普通函数一致,唯一的区别在于:

  • 内置函数的作用域是全局的;
  • UDF 的作用域是 DB 内部。

因此,如果当前会话位于某个数据库内部,直接使用 UDF 名字会在当前 DB 中查找对应的 UDF;否则需要显式指定 UDF 所在的数据库名字,例如 dbName.funcName

为便于演示,后续示例统一在 test_table 上进行测试。建表语句如下:

CREATE TABLE `test_table` (
id int NULL,
d1 double NULL,
str string NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1");

insert into test_table values (1, 111.11, "a,b,c");
insert into test_table values (6, 666.66, "d,e");

Java UDF 示例

使用 Java 编写 UDF 时,主入口必须为 evaluate 函数,这一点与 Hive 等其他引擎保持一致。下例编写一个 AddOne UDF,对整型输入执行加一操作。

  1. 编写 Java 代码并打包生成 JAR 包:

    public class AddOne extends UDF {
    public Integer evaluate(Integer value) {
    return value == null ? null : value + 1;
    }
    }
  2. 在 Doris 中注册 Java UDF 函数。更多语法可参阅 CREATE FUNCTION

    CREATE FUNCTION java_udf_add_one(int) RETURNS int PROPERTIES (
    "file"="file:///path/to/java-udf-demo-jar-with-dependencies.jar",
    "symbol"="org.apache.doris.udf.AddOne",
    "always_nullable"="true",
    "type"="JAVA_UDF"
    );
  3. 调用 UDF。使用 UDF 必须拥有对应数据库的 SELECT 权限。如需查看注册成功的 UDF,可使用 SHOW FUNCTIONS 命令。

    select id,java_udf_add_one(id) from test_table;
    +------+----------------------+
    | id | java_udf_add_one(id) |
    +------+----------------------+
    | 1 | 2 |
    | 6 | 7 |
    +------+----------------------+
  4. 当不再需要某个 UDF 函数时,可以使用 DROP FUNCTION 命令删除。

如果定义的 UDF 中需要加载较大的资源文件,或希望定义全局 static 变量,可参考下文「最佳实践」一节。

Java UDAF 示例

使用 Java 编写 UDAF 时,需要实现一组规定的函数(标记为 required)以及一个内部类 State。下面通过两个示例进行说明。

  1. 编写对应的 Java UDAF 代码并打包生成 JAR 包。
示例 1:SimpleDemo 实现一个类似 sum 的简单聚合函数,输入参数为 INT,输出参数为 INT
package org.apache.doris.udf;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.logging.Logger;

public class SimpleDemo {

Logger log = Logger.getLogger("SimpleDemo");

//Need an inner class to store data
/*required*/
public static class State {
/*some variables if you need */
public int sum = 0;
}

/*required*/
public State create() {
/* here could do some init work if needed */
return new State();
}

/*required*/
public void destroy(State state) {
/* here could do some destroy work if needed */
}

/*Not Required*/
public void reset(State state) {
/*if you want this udaf function can work with window function.*/
/*Must impl this, it will be reset to init state after calculate every window frame*/
state.sum = 0;
}

/*required*/
//first argument is State, then other types your input
public void add(State state, Integer val) throws Exception {
/* here doing update work when input data*/
if (val != null) {
state.sum += val;
}
}

/*required*/
public void serialize(State state, DataOutputStream out) throws IOException {
/* serialize some data into buffer */
out.writeInt(state.sum);
}

/*required*/
public void deserialize(State state, DataInputStream in) throws IOException {
/* deserialize get data from buffer before you put */
int val = in.readInt();
state.sum = val;
}

/*required*/
public void merge(State state, State rhs) throws Exception {
/* merge data from state */
state.sum += rhs.sum;
}

/*required*/
//return Type you defined
public Integer getValue(State state) throws Exception {
/* return finally result */
return state.sum;
}
}
示例 2:MedianUDAF 实现计算中位数的功能,输入类型为 (DOUBLE, INT),输出类型为 DOUBLE
package org.apache.doris.udf.demo;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.logging.Logger;

/*UDAF 计算中位数*/
public class MedianUDAF {
Logger log = Logger.getLogger("MedianUDAF");

//状态存储
public static class State {
//返回结果的精度
int scale = 0;
//是否是某一个 tablet 下的某个聚合条件下的数据第一次执行 add 方法
boolean isFirst = true;
//数据存储
public StringBuilder stringBuilder;
}

//状态初始化
public State create() {
State state = new State();
//根据每个 tablet 下的聚合条件需要聚合的数据量大小,预先初始化,增加性能
state.stringBuilder = new StringBuilder(1000);
return state;
}


//处理执行单位处理各自 tablet 下的各自聚合条件下的每个数据
public void add(State state, Double val, int scale) throws IOException {
if (val != null && state.isFirst) {
state.stringBuilder.append(scale).append(",").append(val).append(",");
state.isFirst = false;
} else if (val != null) {
state.stringBuilder.append(val).append(",");
}
}

//处理数据完需要输出等待聚合
public void serialize(State state, DataOutputStream out) throws IOException {
//目前暂时只提供 DataOutputStream,如果需要序列化对象可以考虑拼接字符串,转换 json,序列化成字节数组等方式
//如果要序列化 State 对象,可能需要自己将 State 内部类实现序列化接口
//最终都是要通过 DataOutputStream 传输
out.writeUTF(state.stringBuilder.toString());
}

//获取处理数据执行单位输出的数据
public void deserialize(State state, DataInputStream in) throws IOException {
String string = in.readUTF();
state.scale = Integer.parseInt(String.valueOf(string.charAt(0)));
StringBuilder stringBuilder = new StringBuilder(string.substring(2));
state.stringBuilder = stringBuilder;
}

//聚合执行单位按照聚合条件合并某一个键下数据的处理结果 ,每个键第一次合并时,state1 参数是初始化的实例
public void merge(State state1, State state2) throws IOException {
state1.scale = state2.scale;
state1.stringBuilder.append(state2.stringBuilder.toString());
}

//对每个键合并后的数据进行并输出最终结果
public Double getValue(State state) throws IOException {
String[] strings = state.stringBuilder.toString().split(",");
double[] doubles = new double[strings.length + 1];
doubles = Arrays.stream(strings).mapToDouble(Double::parseDouble).toArray();

Arrays.sort(doubles);
double n = doubles.length - 1;
double index = n * 0.5;

int low = (int) Math.floor(index);
int high = (int) Math.ceil(index);

double value = low == high ? (doubles[low] + doubles[high]) * 0.5 : doubles[high];

BigDecimal decimal = new BigDecimal(value);
return decimal.setScale(state.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
}

//每个执行单位执行完都会执行
public void destroy(State state) {
}

}
  1. 在 Doris 中注册 Java UDAF 函数。更多语法可参阅 CREATE FUNCTION

    CREATE AGGREGATE FUNCTION simple_demo(INT) RETURNS INT PROPERTIES (
    "file"="file:///pathTo/java-udaf.jar",
    "symbol"="org.apache.doris.udf.SimpleDemo",
    "always_nullable"="true",
    "type"="JAVA_UDF"
    );
  2. 调用 Java UDAF。可分组聚合,也可对全部结果进行聚合:

    select simple_demo(id) from test_table group by id;
    +-----------------+
    | simple_demo(id) |
    +-----------------+
    | 1 |
    | 6 |
    +-----------------+
    select simple_demo(id) from test_table;
    +-----------------+
    | simple_demo(id) |
    +-----------------+
    | 7 |
    +-----------------+

Java UDWF 示例

Java UDWF 的代码结构与 Java UDAF 完全一致,只需额外实现 reset 接口,将所有 state 状态置为初始值即可:

void reset(State state)
  1. 编写并打包 Java UDWF 代码(同上)。

  2. 在 Doris 中注册 Java UDWF 函数,注册方式与 Java UDAF 一致。更多语法可参阅 CREATE FUNCTION

    CREATE AGGREGATE FUNCTION simple_demo_window(INT) RETURNS INT PROPERTIES (
    "file"="file:///pathTo/java-udaf.jar",
    "symbol"="org.apache.doris.udf.SimpleDemo",
    "always_nullable"="true",
    "type"="JAVA_UDF"
    );
  3. 调用 Java UDWF,可在指定窗口范围内计算结果。更多语法请参考窗口函数

    select id, simple_demo_window(id) over(partition by id order by d1 rows between 1 preceding and 1 following) as res from test_table;
    +------+------+
    | id | res |
    +------+------+
    | 1 | 1 |
    | 6 | 6 |
    +------+------+

Java UDTF 示例

提示

UDTF 自 Doris 3.0 版本开始支持。

UDTF 与 UDF 一样需要用户实现 evaluate 方法,但 UDTF 的返回值必须是 Array 类型。

  1. 编写对应的 Java UDTF 代码并打包生成 JAR 包:

    public class UDTFStringTest {
    public ArrayList<String> evaluate(String value, String separator) {
    if (value == null || separator == null) {
    return null;
    } else {
    return new ArrayList<>(Arrays.asList(value.split(separator)));
    }
    }
    }
  2. 在 Doris 中注册 Java UDTF 函数。注册时会同时生成两个 UDTF:函数名后带 _outer 后缀的版本针对结果为 0 行的场景做了特殊处理,详情可参考 OUTER 组合器。更多语法可参阅 CREATE FUNCTION

    CREATE TABLES FUNCTION java-utdf(string, string) RETURNS array<string> PROPERTIES (
    "file"="file:///pathTo/java-udtf.jar",
    "symbol"="org.apache.doris.udf.demo.UDTFStringTest",
    "always_nullable"="true",
    "type"="JAVA_UDF"
    );
  3. 调用 Java UDTF。在 Doris 中使用 UDTF 需要结合 Lateral View,以实现行转列效果:

    select id, str, e1 from test_table lateral view java_utdf(str,',') tmp as e1;
    +------+-------+------+
    | id | str | e1 |
    +------+-------+------+
    | 1 | a,b,c | a |
    | 1 | a,b,c | b |
    | 1 | a,b,c | c |
    | 6 | d,e | d |
    | 6 | d,e | e |
    +------+-------+------+

最佳实践:Static 变量加载

在 Doris 中执行一个 UDF 函数(例如 select udf(col) from table)时,每个并发 Instance 会加载一次 udf.jar 包,并在该 Instance 结束时卸载。

由此带来两个常见问题:

  • udf.jar 文件中需要加载几百 MB 的资源文件时,并发会使内存占用急剧增大,容易触发 OOM。
  • 想使用连接池等需要在 static 区域只初始化一次的对象时,按当前并发模型无法实现。

下面提供两个解决方案。其中方案二需要 Doris 版本在 branch-3.0 及以上。

方案 1:拆分资源 JAR 包

将资源加载相关的代码拆分出来,单独生成一个 JAR 包,让其他业务 JAR 引用该资源 JAR。

假设代码已被拆分为 DictLibrary(资源类)和 FunctionUdf(业务类)两个文件。

public class DictLibrary {
private static HashMap<String, String> res = new HashMap<>();

static {
// suppose we built this dictionary from a certain local file.
res.put("key1", "value1");
res.put("key2", "value2");
res.put("key3", "value3");
res.put("0", "value4");
res.put("1", "value5");
res.put("2", "value6");
}

public static String evaluate(String key) {
if (key == null) {
return null;
}
return res.get(key);
}
}
public class FunctionUdf {
public String evaluate(String key) {
String value = DictLibrary.evaluate(key);
return value;
}
}

操作步骤如下:

  1. 单独编译 DictLibrary 文件,生成独立的资源 JAR 包 DictLibrary.jar

    javac   ./DictLibrary.java
    jar -cf ./DictLibrary.jar ./DictLibrary.class
  2. 编译 FunctionUdf 文件,引用上一步的资源包作为依赖,得到 UDF 业务包 FunctionUdf.jar

    javac -cp ./DictLibrary.jar  ./FunctionUdf.java
    jar -cvf ./FunctionUdf.jar ./FunctionUdf.class
  3. 为了让资源 JAR 包被所有并发共享,需让其被 JVM 直接加载。将其放入指定路径 be/custom_lib 下,BE 服务重启后即可随 JVM 启动一并加载,随服务停止而释放。

  4. 最后使用 CREATE FUNCTION 语句创建 UDF 函数,每次实例卸载时仅卸载 FunctionUdf.jar

    CREATE FUNCTION java_udf_dict(string) RETURNS string PROPERTIES (
    "file"="file:///pathTo/FunctionUdf.jar",
    "symbol"="org.apache.doris.udf.FunctionUdf",
    "always_nullable"="true",
    "type"="JAVA_UDF"
    );

方案 2:BE 全局缓存 JAR 包

BE 全局缓存 JAR 包,并支持自定义过期淘汰时间。在 CREATE FUNCTION 时增加以下两个属性字段:

属性说明默认值
static_load是否使用静态 cache 加载方式false
expiration_timeJAR 包过期时间,单位为分钟360

工作机制如下:

  • 启用静态 cache 加载方式后,第一次调用该 UDF 时,初始化完成后会将该 UDF 实例缓存起来。
  • 后续调用该 UDF 时,先在 cache 中查找;若未命中,再执行相关初始化操作。
  • 后台有线程定期检查,如果在配置的过期淘汰时间内一直未被调用,则会从 cache 中清理掉。
  • 如果在过期前被再次调用,则会自动更新缓存时间点。

示例代码如下:

public class Print extends UDF {
static Integer val = 0;
public Integer evaluate() {
val = val + 1;
return val;
}
}
CREATE FUNCTION print_12() RETURNS int 
PROPERTIES (
"file" = "file:///path/to/java-udf-demo-jar-with-dependencies.jar",
"symbol" = "org.apache.doris.udf.Print",
"always_nullable"="true",
"type" = "JAVA_UDF",
"static_load" = "true", // default value is false
"expiration_time" = "60" // default value is 360 minutes
);

可以看到执行结果一直在递增,说明加载的 JAR 包没有被卸载后又重新加载(否则变量会被重新初始化为 0):

mysql [test_query_qa]>select print_12();
+------------+
| print_12() |
+------------+
| 1 |
+------------+
1 row in set (0.40 sec)

mysql [test_query_qa]>select print_12();
+------------+
| print_12() |
+------------+
| 2 |
+------------+
1 row in set (0.03 sec)

mysql [test_query_qa]>select print_12();
+------------+
| print_12() |
+------------+
| 3 |
+------------+
1 row in set (0.04 sec)