GenericUDF函数从结构数组中提取字段 [英] A GenericUDF Function to Extract a Field From an Array of Structs‏

查看:1735
本文介绍了GenericUDF函数从结构数组中提取字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图编写一个GenericUDF函数来为每个记录收集数组中所有特定的结构体字段,并将它们返回到数组中。



我写了GenericUDF(如下),它似乎工作,但:

1)当我在外部表上执行此操作时不起作用,它在托管表上工作正常,任何想法?



2)我正在写一篇测试的难度很大。我已附加到目前为止的测试,并且它不起作用,
总是得到'java.util.ArrayList不能转换为org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector'或不能转换String到LazyString',
我的问题是如何提供一个evalue方法的结构列表?



任何帮助将不胜感激。


$ b $ p



  CREATE EXTERNAL TABLE FOO(
TS字符串,
customerId string,
products array< struct< productCategory:string>>

PARTITIONED BY(ds string)
ROW FORMAT SERDE'some.serde'
with SERDEPROPERTIES('error.ignore'='true')
LOCATION'some_locations'
;

一行记录包含:

1340321132000, 'some_company',[{productCategory:footwear},{productCategory:eyewear}]



这是我的代码:

  import org.apache.hadoop.hive.ql.exec.Description; 
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
导入org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.Text;

import java.util.ArrayList;

@Description(name =extract_product_category,
value =_FUNC_(array< struct< productcategory:string>>) - 收集struct数组内的所有产品类别字段值s),并将结果返回到数组< string>中,
extended =示例:\ n SELECT _FUNC_(array_of_structs_with_product_category_field))
public class GenericUDFExtractProductCategory
extends GenericUDF
{
private ArrayList ret;

私人ListObjectInspector listOI;
private StructObjectInspector structOI;
private ObjectInspector prodCatOI;
$ b @Override
public ObjectInspector initialize(ObjectInspector [] args)
抛出UDFArgumentException
{
if(args.length!= 1){
抛出新的UDFArgumentLengthException(函数extract_product_category()只需要一个参数。);
}

if(args [0] .getCategory()!= Category.LIST){
throw new UDFArgumentTypeException(0,Type array< struct> is expected to be extract_product_category的参数,但+ args [0] .getTypeName()+被找到,而不是);
}

listOI =((ListObjectInspector)args [0]);
structOI =((StructObjectInspector)listOI.getListElementObjectInspector());
$ b $ if(structOI.getAllStructFieldRefs()。size()!= 1){
throw new UDFArgumentTypeException(0,结构中的字段数不正确,应该是一个);
}

StructField productCategoryField = structOI.getStructFieldRef(productCategory);
//如果不是,则抛出异常
if(productCategoryField == null){
throw new UDFArgumentTypeException(0,NO \productCategory \field in input structure);
}

//它们是正确的类型吗?
//我们存储这些对象检查器以用于evaluate()方法
prodCatOI = productCategoryField.getFieldObjectInspector();

//首先是它们原语
if(prodCatOI.getCategory()!= Category.PRIMITIVE){
throw new UDFArgumentTypeException(0,productCategory字段必须是字符串类型);
}

//它们是否是正确的基元?
if(((PrimitiveObjectInspector)prodCatOI).getPrimitiveCategory()!= PrimitiveObjectInspector.PrimitiveCategory.STRING){
throw new UDFArgumentTypeException(0,productCategory field must be string type);
}

ret = new ArrayList();

return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);

$ b @Override
public ArrayList evaluate(DeferredObject [] arguments)
throws HiveException
{
ret.clear();

if(arguments.length!= 1){
return null;
}

if(arguments [0] .get()== null){
return null;
}

int numElements = listOI.getListLength(arguments [0] .get());

for(int i = 0; i< numElements; i ++){
LazyString prodCatDataObject =(LazyString)(structOI.getStructFieldData(listOI.getListElement(arguments [0] .get() ,i),structOI.getStructFieldRef(productCategory)));
Text productCategoryValue =((StringObjectInspector)prodCatOI).getPrimitiveWritableObject(prodCatDataObject);
ret.add(productCategoryValue);
}
return ret;

$ b @Override
public String getDisplayString(String [] strings)
{
assert(strings.length> 0);
StringBuilder sb = new StringBuilder();
sb.append(extract_product_category();
sb.append(strings [0]);
sb.append());
return sb.toString();


$ / code>

我的测试:

  import org.apache.hadoop.hive.ql.metadata.HiveException; 
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;

public class TestGenericUDFExtractShas
{
ArrayList< String> fieldNames = new ArrayList< String>();
ArrayList< ObjectInspector> fieldObjectInspectors = new ArrayList< ObjectInspector>();
$ b $ @Test
public void simpleTest()
throws Exception
{
ListObjectInspector firstInspector = new MyListObjectInspector();

ArrayList test = new ArrayList();
test.add(test);

ArrayList test2 = new ArrayList();
test2.add(test);

StructObjectInspector soi = ObjectInspectorFactory.getStandardStructObjectInspector(test,test2);

fieldNames.add(productCategory);
fieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);

GenericUDF.DeferredObject firstDeferredObject = new MyDeferredObject(test2);

GenericUDF extract_product_category = new GenericUDFExtractProductCategory();

extract_product_category.initialize(new ObjectInspector [] {firstInspector});

extract_product_category.evaluate(new DeferredObject [] {firstDeferredObject});
}

public class MyDeferredObject implements DeferredObject
{
private Object value;

public MyDeferredObject(Object value){
this.value = value;

$ b @Override
public Object get()抛出HiveException
{
返回值;


$ b $ private class MyListObjectInspector implements ListObjectInspector
{
@Override
public ObjectInspector getListElementObjectInspector()
{
返回ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldObjectInspectors);

$ b @Override
public Object getListElement(Object data,int index)
{
List myList =(List)data;
if(myList == null || index> myList.size()){
return null;
}
return myList.get(index);

$ b $ @覆盖
public int getListLength(对象数据)
{
if(data == null){
return -1 ;

return((List)data).size();
}

@Override
public List<?> getList(Object data)
{
return(List)data;
}

@Override
public String getTypeName()
{
return null; //使用File |改变实现方法的主体设置|文件模板。
}

@Override
public Category getCategory()
{
return Category.LIST;




解决方案

我无法对测试进行说明,但在下面讨论的一个警告中,我想我有一个解决外部表问题的方法。



在调整你的代码符合我的需要我在评估方法中将字符串改为long:

您的代码:

  LazyString prodCatDataObject =(LazyString)(structOI.getStructFieldData(listOI.getListElement(arguments [0] .get(),i),structOI.getStructFieldRef(productCategory))); 
Text productCategoryValue =((StringObjectInspector)prodCatOI).getPrimitiveWritableObject(prodCatDataObject);

我的旧密码:

  LazyLong indDataObject =(LazyLong)(structOI.getStructFieldData(listOI.getListElement(arguments [0] .get(),i),structOI.getStructFieldRef(indexName))); 
LongWritable indValue =((LazyLongObjectInspector)indOI).getPrimitiveWritableObject(indDataObject);

您可以看到它们具有不同数据类型的相同逻辑。 b

这适用于非外部表格。没有与外部表一起工作。



我能够通过用以下代码替换旧代码来解决此问题:



<$ (long)(structOI.getStructFieldData(listOI.getListElement(arguments [0] .get(),i)),structOI.getStructFieldRef(indexName))); p $ p> long indValue =

在另一个版本中,我正在返回文本



您可能也可以做类似的事情,即在第一步中转换为text / string。



您可能还需要更改 public Text evaluate(DeferredObject [] arguments) to public Object evaluate(DeferredObject [] arguments)



处理数组的一些工作UDF的源代码可用此处



现在需要注意的是:这似乎不适用于作为ORC存储的表。 (原来的代码也不介意)。我可能会为此创建一个问题。我不确定问题是什么。


I am trying to write a GenericUDF function to collect all of a specific struct field(s) within an array for each record, and return them in an array as well.

I wrote the GenericUDF (as below), and it seems to work but:

1) It does not work when I am performing this on an external table, it works fine on a managed table, any idea?

2) I am having a tough time writing a test on this. I have attached the test I have so far, and it does not work, always getting 'java.util.ArrayList cannot be cast to org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector' or cannot cast String to LazyString', my question is how do I supply a list of structs for the evalue method?

Any help will be greatly appreciated.

The table:

CREATE EXTERNAL TABLE FOO (  
  TS string,  
  customerId string,  
  products array< struct<productCategory:string> >  
)  
PARTITIONED BY (ds string)  
ROW FORMAT SERDE 'some.serde'  
WITH SERDEPROPERTIES ('error.ignore'='true')  
LOCATION 'some_locations'  
;

A row of record holds:
1340321132000, 'some_company', [{"productCategory":"footwear"},{"productCategory":"eyewear"}]

This is my code:

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.Text;

import java.util.ArrayList;

@Description(name = "extract_product_category",
    value = "_FUNC_( array< struct<productcategory:string> > ) - Collect all product category field values inside an array of struct(s), and return the results in an array<string>",
    extended = "Example:\n SELECT _FUNC_(array_of_structs_with_product_category_field)")
public class GenericUDFExtractProductCategory
        extends GenericUDF
{
    private ArrayList ret;

    private ListObjectInspector listOI;
    private StructObjectInspector structOI;
    private ObjectInspector prodCatOI;

    @Override
    public ObjectInspector initialize(ObjectInspector[] args)
            throws UDFArgumentException
    {
        if (args.length != 1) {
            throw new UDFArgumentLengthException("The function extract_product_category() requires exactly one argument.");
        }

        if (args[0].getCategory() != Category.LIST) {
            throw new UDFArgumentTypeException(0, "Type array<struct> is expected to be the argument for extract_product_category but " + args[0].getTypeName() + " is found instead");
        }

        listOI = ((ListObjectInspector) args[0]);
        structOI = ((StructObjectInspector) listOI.getListElementObjectInspector());

        if (structOI.getAllStructFieldRefs().size() != 1) {
            throw new UDFArgumentTypeException(0, "Incorrect number of fields in the struct, should be one");
        }

        StructField productCategoryField = structOI.getStructFieldRef("productCategory");
        //If not, throw exception
        if (productCategoryField == null) {
            throw new UDFArgumentTypeException(0, "NO \"productCategory\" field in input structure");
        }

        //Are they of the correct types?
        //We store these object inspectors for use in the evaluate() method
        prodCatOI = productCategoryField.getFieldObjectInspector();

        //First are they primitives
        if (prodCatOI.getCategory() != Category.PRIMITIVE) {
            throw new UDFArgumentTypeException(0, "productCategory field must be of string type");
        }

        //Are they of the correct primitives?
        if (((PrimitiveObjectInspector)prodCatOI).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            throw new UDFArgumentTypeException(0, "productCategory field must be of string type");
        }

        ret = new ArrayList();

        return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    }

    @Override
    public ArrayList evaluate(DeferredObject[] arguments)
            throws HiveException
    {
        ret.clear();

        if (arguments.length != 1) {
            return null;
        }

        if (arguments[0].get() == null) {
        return null;
        }

        int numElements = listOI.getListLength(arguments[0].get());

        for (int i = 0; i < numElements; i++) {
            LazyString prodCatDataObject = (LazyString) (structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), i), structOI.getStructFieldRef("productCategory")));
            Text productCategoryValue = ((StringObjectInspector) prodCatOI).getPrimitiveWritableObject(prodCatDataObject);
            ret.add(productCategoryValue);
        }
        return ret;
    }

    @Override
    public String getDisplayString(String[] strings)
    {
        assert (strings.length > 0);
        StringBuilder sb = new StringBuilder();
        sb.append("extract_product_category(");
        sb.append(strings[0]);
        sb.append(")");
        return sb.toString();
    }
}

My Test:

import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;

public class TestGenericUDFExtractShas
{
    ArrayList<String> fieldNames = new ArrayList<String>();
    ArrayList<ObjectInspector> fieldObjectInspectors = new ArrayList<ObjectInspector>();

    @Test
    public void simpleTest()
        throws Exception
    {
        ListObjectInspector firstInspector = new MyListObjectInspector();

        ArrayList test = new ArrayList();
        test.add("test");

        ArrayList test2 = new ArrayList();
        test2.add(test);

        StructObjectInspector soi = ObjectInspectorFactory.getStandardStructObjectInspector(test, test2);

        fieldNames.add("productCategory");
           fieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);

        GenericUDF.DeferredObject firstDeferredObject = new MyDeferredObject(test2);

        GenericUDF extract_product_category = new GenericUDFExtractProductCategory();

        extract_product_category.initialize(new ObjectInspector[]{firstInspector});

        extract_product_category.evaluate(new DeferredObject[]{firstDeferredObject});
    }

    public class MyDeferredObject implements DeferredObject
    {
        private Object value;

        public MyDeferredObject(Object value) {
            this.value = value;
        }

        @Override
        public Object get() throws HiveException
        {
            return value;
        }
    }

    private class MyListObjectInspector implements ListObjectInspector
    {
        @Override
        public ObjectInspector getListElementObjectInspector()
        {
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldObjectInspectors);
        }

        @Override
        public Object getListElement(Object data, int index)
        {
            List myList = (List) data;
            if (myList == null || index > myList.size()) {
                return null;
            }
            return myList.get(index);
        }

        @Override
        public int getListLength(Object data)
        {
            if (data == null) {
                return -1;
            }
            return ((List) data).size();
        }

        @Override
        public List<?> getList(Object data)
        {
            return (List) data;
        }

        @Override
        public String getTypeName()
        {
            return null;  //To change body of implemented methods use File | Settings | File Templates.
        }

        @Override
        public Category getCategory()
        {
            return Category.LIST;
        }
    }
}

解决方案

I can't speak to the testing, but with a caveat discussed below, I think I have a solution for the issue with external tables.

In adapting your code to my needs I changed string to long in the evaluate method:

your code:

LazyString prodCatDataObject = (LazyString) (structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), i), structOI.getStructFieldRef("productCategory")));
Text productCategoryValue = ((StringObjectInspector) prodCatOI).getPrimitiveWritableObject(prodCatDataObject);

my old code:

LazyLong indDataObject = (LazyLong) (structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), i), structOI.getStructFieldRef(indexName)));
LongWritable indValue = ((LazyLongObjectInspector) indOI).getPrimitiveWritableObject(indDataObject);

You can see they are the same logic with different data types etc.

This worked for me with non-external table. Did not work with external table.

I was able to resolve this by replacing my old code with this:

long indValue = (Long) (structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), i), structOI.getStructFieldRef(indexName)));

In another version, where I was returning text

You can probably do something similar, namely by casting to text / string in the first step.

You may also have to change public Text evaluate(DeferredObject[] arguments) to public Object evaluate(DeferredObject[] arguments).

Source code for some working UDFs that handle array is available here.

Now for the caveat: this does not appear to work with tables stored as ORC. (neither does the original code, mind you). I will probably create a question about this. I am not sure what the issue is.

这篇关于GenericUDF函数从结构数组中提取字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆