如何在 hive UDF 中获取 taskID 或 mapperID(类似于 Spark 中的 partitionID)? [英] How to get the taskID or mapperID(something like partitionID in Spark) in a hive UDF?

查看:236
本文介绍了如何在 hive UDF 中获取 taskID 或 mapperID(类似于 Spark 中的 partitionID)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

作为问题,如何在 hive UDF 中获取 taskID 或 mapperID(类似于 Spark 中的 partitionID)?

As question, How to get the taskID or mapperID(something like partitionID in Spark) in a hive UDF ?

推荐答案

我自己找到了正确答案,我们可以通过以下方式在hive UDF中获取taskID:

I have found the correct answer on my own, we can get the taskID in a hive UDF the way as below :

public class TestUDF extends GenericUDF  {
    private Text result = new Text();
    private String tmpStr = "";

    @Override
    public void configure(MapredContext context) {
        //get the number of tasks 获取task总数量
        int numTasks = context.getJobConf().getNumMapTasks();
        //get the current taskID 获取当前taskID
        String taskID =  context.getJobConf().get("mapred.task.id");
        this.tmpStr = numTasks + "_h_xXx_h_" + taskID;
    }

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments)
            throws UDFArgumentException {
        return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
    }
    
    @Override
    public Object evaluate(DeferredObject[] arguments) {
        result.set(this.tmpStr);
        return this.result;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "RowSeq-func()";
    }
}

但这仅在 MapReduce 执行引擎中有效,在 SparkSQL 引擎中无效.
测试代码如下:

but this would be effective only in MapReduce execution engine, it would not work in a SparkSQL engine.
Test code as below:

add jar hdfs:///home/dp/tmp/shaw/my_udf.jar;
create temporary function seqx AS 'com.udf.TestUDF';

with core as (
select 
    device_id
from
    test_table
where
    p_date = '20210309' 
    and product = 'google'
distribute by
    device_id
)
select
    seqx() as seqs,
    count(1) as cc
from
    core
group by
    seqx()
order by
    seqs asc 

在MR引擎中的结果如下,看到我们已经成功获取到任务号和任务ID:

Result in MR engine as below, see we have got the task number and taskID successfully:

在 Spark 引擎中的结果与上面相同的 sql,UDF 无效,我们对 taskID 一无所知:

Result in Spark engine with same sql above, the UDF is not valid, we get nothing about taskID:

如果您在 Spark 引擎中运行 HQL 并同时调用 Hive UDF,并且确实需要在 Spark 中获取 partitionId,请参阅以下代码:

If you run your HQL in Spark engine and call the Hive UDF meanwhile, and really need to get the partitionId in Spark, see the code below :

import org.apache.spark.TaskContext;

public class TestUDF extends GenericUDF  {
    private Text result = new Text();
    private String tmpStr = "";

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments)
            throws UDFArgumentException {
        //get spark partitionId
        this.tmpStr = TaskContext.getPartitionId() + "-initial-pid";
        return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
    }

    public Object evaluate(DeferredObject[] arguments) {
        //get spark partitionId
        this.tmpStr = TaskContext.getPartitionId() + "-evaluate-pid";
        result.set(this.tmpStr);
        return this.result;
    }
}

如上,在UDF类的override方法initializeevalute中调用TaskContext.getPartitionId()可以得到Spark partitionId.
注意:你的UDF必须有参数,比如select my_udf(param),这会导致你的UDF在多个任务中初始化,如果你的UDF没有参数,它会在Driver处初始化,并且Driver 没有 taskContext 和 partitionId,所以你什么也得不到.

As above, you can get the Spark partitionId by calling TaskContext.getPartitionId() in the override method initialize or evalute of UDF class.
Notice: your UDF must has params, suchs select my_udf(param), this would lead your UDF initialized in multiple tasks, if your UDF do not have a param, it will be initialized at the Driver, and the Driver do not have the taskContext and partitionId, so you would get nothing.

下图是上述UDF在Spark引擎中执行的结果,看,我们成功获取partitionIds:

The image below is a result produced by the above UDF executed in Spark engine,see, we get the partitionIds successfully :

这篇关于如何在 hive UDF 中获取 taskID 或 mapperID(类似于 Spark 中的 partitionID)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆