之前介绍的 UDF 特点是输入一行输出一行;本文将要介绍的是 UDTF,其特点是输入一行输出多行,而使用的接口是 GenericUDTF,比 UDF 更为复杂。

0x00 自定义 GenericUDTF 开发

编写 GenericUDTF 需要两个步骤:

  • 继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF 类;
  • 重写 initializeprocessclose 三个方法;

每个方法有着不同的作用,参考如下:

1
2
3
4
5
6
7
8
9
// 该方法指定输入输出参数:输入参数的ObjectInspector与输出参数的StructObjectInspector
abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;

// 该方法处理输入记录,然后通过forward()方法返回输出结果
abstract void process(Object[] record) throws HiveException;

// 该方法用于通知UDTF没有行可以处理了
// 另外,可以在该方法中清理代码或者产生额外的输出
abstract void close() throws HiveException;

其中 process() 方法中有个 forward() 方法需要解释下,对于每一行的输入都会有多行的输出,每一行输出时都要调用 forward() 方法,并定义输出行的格式(一列或多列)。

0x01 官方 ExplodeGenericUDTF 示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package com.data.hive;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.TaskExecutionException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

@Description(
name = "explode",
value = "_FUNC_(a) - separates the elements of array a into multiple rows,"
+ " or the elements of a map into multiple rows and columns ")
public class ExplodeGenericUDTF extends GenericUDTF {

private transient ObjectInspector inputOI = null;

@Override
public void close() throws HiveException {
}

@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
if (args.length != 1) {
throw new UDFArgumentException("explode() takes only one argument");
}

ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

switch (args[0].getCategory()) {
case LIST:
inputOI = args[0];
fieldNames.add("col");
fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector());
break;
case MAP:
inputOI = args[0];
fieldNames.add("key");
fieldNames.add("value");
fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());
fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());
break;
default:
throw new UDFArgumentException("explode() takes an array or a map as a parameter");
}

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

private transient final Object[] forwardListObj = new Object[1];
private transient final Object[] forwardMapObj = new Object[2];

@Override
public void process(Object[] o) throws HiveException {
switch (inputOI.getCategory()) {
case LIST:
ListObjectInspector listOI = (ListObjectInspector)inputOI;
List<?> list = listOI.getList(o[0]);
if (list == null) {
return;
}
for (Object r : list) {
forwardListObj[0] = r;
forward(forwardListObj); //输出一行
}
break;
case MAP:
MapObjectInspector mapOI = (MapObjectInspector)inputOI;
Map<?,?> map = mapOI.getMap(o[0]);
if (map == null) {
return;
}
for (Entry<?,?> r : map.entrySet()) {
forwardMapObj[0] = r.getKey();
forwardMapObj[1] = r.getValue();
forward(forwardMapObj); //输出一行
}
break;
default:
throw new TaskExecutionException("explode() can only operate on an array or a map");
}
}

@Override
public String toString() {
return "explode";
}
}

0x02 代码走读

方法的调用关系如下:

  1. initialize() 被调用
    1.1 检查输入参数的个数
    1.2 检查参数的类型是否为 LIST 或 MAP,并保存 inputOI 用以供 process() 使用
    1.3 返回 StructObjectInspector,并定义好输出行的格式

  2. process() 被调用
    2.1. 判断输入参数的数据类型
    2.2. 对于 LIST 类型,每一个元素输出一行,且只有一列
    2.3. 对于 MAP 类型,每一对键值输出一行,且有两列
    2.4. 其他数据类型抛出异常

  3. close() 被调用,什么都不做

参考文献

DeveloperGuide UDTF
HiveQL的函数概览