如何在 hive UDF 中获取 taskID 或 mapperID(类似于 Spark 中的 partitionID)? [英] How to get the taskID or mapperID(something like partitionID in Spark) in a hive UDF?
问题描述
作为问题,如何在 hive UDF 中获取 taskID 或 mapperID(类似于 Spark 中的 partitionID)?
As question, How to get the taskID or mapperID(something like partitionID in Spark) in a hive UDF ?
推荐答案
我自己找到了正确答案,我们可以通过以下方式在hive UDF中获取taskID:
I have found the correct answer on my own, we can get the taskID in a hive UDF the way as below :
public class TestUDF extends GenericUDF {
private Text result = new Text();
private String tmpStr = "";
@Override
public void configure(MapredContext context) {
//get the number of tasks 获取task总数量
int numTasks = context.getJobConf().getNumMapTasks();
//get the current taskID 获取当前taskID
String taskID = context.getJobConf().get("mapred.task.id");
this.tmpStr = numTasks + "_h_xXx_h_" + taskID;
}
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) {
result.set(this.tmpStr);
return this.result;
}
@Override
public String getDisplayString(String[] children) {
return "RowSeq-func()";
}
}
但这仅在 MapReduce 执行引擎中有效,在 SparkSQL 引擎中无效.
测试代码如下:
but this would be effective only in MapReduce execution engine, it would not work in a SparkSQL engine.
Test code as below:
add jar hdfs:///home/dp/tmp/shaw/my_udf.jar;
create temporary function seqx AS 'com.udf.TestUDF';
with core as (
select
device_id
from
test_table
where
p_date = '20210309'
and product = 'google'
distribute by
device_id
)
select
seqx() as seqs,
count(1) as cc
from
core
group by
seqx()
order by
seqs asc
在MR引擎中的结果如下,看到我们已经成功获取到任务号和任务ID:
Result in MR engine as below, see we have got the task number and taskID successfully:
在 Spark 引擎中的结果与上面相同的 sql,UDF 无效,我们对 taskID 一无所知:
Result in Spark engine with same sql above, the UDF is not valid, we get nothing about taskID:
如果您在 Spark 引擎中运行 HQL 并同时调用 Hive UDF,并且确实需要在 Spark 中获取 partitionId,请参阅以下代码:
If you run your HQL in Spark engine and call the Hive UDF meanwhile, and really need to get the partitionId in Spark, see the code below :
import org.apache.spark.TaskContext;
public class TestUDF extends GenericUDF {
private Text result = new Text();
private String tmpStr = "";
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
//get spark partitionId
this.tmpStr = TaskContext.getPartitionId() + "-initial-pid";
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
public Object evaluate(DeferredObject[] arguments) {
//get spark partitionId
this.tmpStr = TaskContext.getPartitionId() + "-evaluate-pid";
result.set(this.tmpStr);
return this.result;
}
}
如上,在UDF类的override方法initialize
或evalute
中调用TaskContext.getPartitionId()
可以得到Spark partitionId.
注意:你的UDF必须有参数,比如select my_udf(param)
,这会导致你的UDF在多个任务中初始化,如果你的UDF没有参数,它会在Driver处初始化,并且Driver 没有 taskContext 和 partitionId,所以你什么也得不到.
As above, you can get the Spark partitionId by calling TaskContext.getPartitionId()
in the override method initialize
or evalute
of UDF class.
Notice: your UDF must has params, suchs select my_udf(param)
, this would lead your UDF initialized in multiple tasks, if your UDF do not have a param, it will be initialized at the Driver, and the Driver do not have the taskContext and partitionId, so you would get nothing.
下图是上述UDF在Spark引擎中执行的结果,看,我们成功获取partitionIds:
The image below is a result produced by the above UDF executed in Spark engine,see, we get the partitionIds successfully :
这篇关于如何在 hive UDF 中获取 taskID 或 mapperID(类似于 Spark 中的 partitionID)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!