在Apache Spark中,将JavaRDD< Row>转换为到Dataset< Row>给出异常:ArrayList不是字符串模式的有效外部类型 [英] In Apache Spark, converting JavaRDD<Row> to Dataset<Row> gives exception: ArrayList is not a valid external type for schema of string

查看:140
本文介绍了在Apache Spark中,将JavaRDD< Row>转换为到Dataset< Row>给出异常:ArrayList不是字符串模式的有效外部类型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 hbase-spark连接器来获取hbase数据进入spark JavaRDD<Row>(由于可以打印获取的hbase数据,因此我认为我可以成功完成此操作).然后,我试图将JavaRDD<Row>转换为Dataset<Row>.但这给了我错误,这将在后面进一步介绍.首先,让我开始编写代码.

I am using hbase-spark connector to fetch hbase data into spark JavaRDD<Row> (which I feel I able to do successfully since I am able to print the hbase data that is fetched). Then, I am trying to convert that JavaRDD<Row> to Dataset<Row>. But it gives me error which is given further in the post. First let me start how my code looks like.

private static JavaRDD<Row> loadHBaseRDD() throws ParseException
{
    //form list of row keys
    List<byte[]> rowKeys = new ArrayList<byte[]>(5);
    //consider ids is class level variable
    ids.forEach(id -> {
        rowKeys.add(Bytes.toBytes(id));     
    });
    JavaRDD<byte[]> rdd = jsc.parallelize(rowKeys);

    //make hbase-spark connector call 
    JavaRDD resultJRDD = jhbc.bulkGet(TableName.valueOf("table1"),2,rdd,new GetFunction(),new ResultFunction());

    return resultJRDD;
}

请注意,bulkGet()接受实例GetFunctionRsultFunction类. GetFunction类具有单个方法,该方法返回Get类的实例(来自 hbase客户端):

Notice that bulkGet() accepts instances GetFunction and RsultFunction classes. GetFunction class has single method which returns instance of Get class (from hbase client):

public static class GetFunction implements Function<byte[], Get> {
    private static final long serialVersionUID = 1L;
    public Get call(byte[] v) throws Exception {
        return new Get(v);
    }
}

ResultFunction具有将Result(hbase客户端类)的实例转换为Row的功能:

The ResultFunction has a function which converts instance of Result (hbase client class) to Row:

public static class ResultFunction implements Function<Result, Row> 
{
    private static final long serialVersionUID = 1L;
    public Row call(Result result) throws Exception 
    {
        List<String> values = new ArrayList<String>(); //notice this is arraylist, we talk about this latter

        for (Cell cell : result.rawCells()) {
            values.add(Bytes.toString(CellUtil.cloneValue(cell)));
        }
        return RowFactory.create(values);
    }
}

当我调用loadHBaseRDD()并打印返回的值时,它会正确打印这些值:

When I call loadHBaseRDD() and print the returned value, it prints the values correctly:

JavaRDD<Row> hbaseJavaRDD = loadHBaseRDD();
hbaseJavaRDD.foreach(row -> { 
    System.out.println(row);   //this prints rows correctly
}); 

这意味着已从hbase正确提取行以生成火花. 现在,我要按照JavaRDD<Row>转换为Dataset<Row> "rel ="nofollow noreferrer">此处.因此,我首先创建StructType:

It means rows have been correctly fetched from hbase to spark. Now I want to convert JavaRDD<Row> to Dataset<Row> as explained here. Thus I first create StructType:

StructType schema = //create schema

然后我尝试将JavaRDD转换为数据帧:

Then I try converting JavaRDD to dataframe:

Dataset<Row> hbaseDataFrame = sparksession1.createDataFrame(hbaseJavaRDD, schema);
hbaseDataFrame.show(false);

这会引发异常,并在hbaseDataFrame.show(false)行中出现非常大的堆栈跟踪(下面仅显示其中的一部分),第一行如下:

This throws exception with very big stacktrace (only part of which is shown below) occurring at line hbaseDataFrame.show(false) with first line as follows:

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.ArrayList is not a valid external type for schema of string

似乎,由于valuesResultFunction.call()内属于ArrayList类型,因此它给出了异常java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.ArrayList is not a valid external type for schema of string.

It seems that, because values is of type ArrayList inside ResultFunction.call(), it is giving exception java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.ArrayList is not a valid external type for schema of string.

stackoveflow上有一个[类似的问题],其中有一个答案,说应该代替列表,返回String[][] .尽管我不知道返回String[][]的原因,但我修改了ResultFunction使其具有String[][]类型的values:

There is [similar question] on stackoveflow which has answer saying that instead of list, one should return String[][]. Though I dont get the reasoning behind returning String[][], I modified ResultFunction to have values of type String[][]:

public static class ResultFunction implements Function<Result, Row> 
{
    private static final long serialVersionUID = 1L;
    public Row call(Result result) throws Exception 
    {
        String[] values = new String[result.rawCells().length];
        String[][] valuesWrapped = new String[1][]; 

        for(int i=0;i<result.rawCells().length;i++)
        {
            values[i] = Bytes.toString(CellUtil.cloneValue(result.rawCells()[i]));
        }
        valuesWrapped[0] = values;
        return RowFactory.create(valuesWrapped);
    }
}

在同一行hbaseDataFrame.show(false)中给出以下异常:

It gives below exception at same line hbaseDataFrame.show(false):

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: [[Ljava.lang.String; is not a valid external type for schema of string

最后,我再次修改了ResultFunction类,使其具有类型为String[]values变量:

Finally I modified ResultFunction class again to have values variable of type String[]:

public static class ResultFunction implements Function<Result, Row>
{
    private static final long serialVersionUID = 1L;
    public Row call(Result result) throws Exception 
    {
        String[] values = new String[result.rawCells().length];     
        for(int i=0;i<result.rawCells().length;i++)
        {
            values[i] = Bytes.toString(CellUtil.cloneValue(result.rawCells()[i]));
        }
        return values;
    }
}

这给了我例外,因为大堆栈跟踪具有起始行:

And this is giving me exception with big stack trace having starting line:

java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 14

那么这里可能出了什么问题?以及我应该怎么做呢?

So what might be going wrong here? And how I am supposed to do this?

推荐答案

(返回String[] values的)最后一种方法是正确的.问题在于格式不正确的架构.看来我最终以某种方式导致架构中的列多于数据中的列. (由于架构字符串中的多余空格字符包含由单个空格分隔的列.多余的空间创建了多余的列.)

The last approach (of returning String[] values) was correct. The issues was with ill formed schema. It seems that I somehow ended up having one more column in the schema than is present in the data. (Thanks to the extra space character in the schema string containing columns separated by the single space. Extra space was creating extra column.)

这篇关于在Apache Spark中,将JavaRDD&lt; Row&gt;转换为到Dataset&lt; Row&gt;给出异常:ArrayList不是字符串模式的有效外部类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆