Spark SQL:嵌套类到镶木地板错误 [英] Spark SQL: Nested classes to parquet error

查看:24
本文介绍了Spark SQL:嵌套类到镶木地板错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我似乎无法编写拼花 JavaRDD ,其中 T 是一个说,Person 类.我把它定义为

I can't seem to write to parquet a JavaRDD<T> where T is a say, Person class. I've defined it as

public class Person implements Serializable
{
    private static final long serialVersionUID = 1L;
    private String name;
    private String age;
    private Address address;
....

地址:

public class Address implements Serializable
{
    private static final long serialVersionUID = 1L;
    private String City; private String Block;
    ...<getters and setters>

然后我像这样创建一个 JavaRDD:

I then create a JavaRDD like so:

JavaRDD<Person> people = sc.textFile("/user/johndoe/spark/data/people.txt").map(new Function<String, Person>()
    {
        public Person call(String line)
        {
            String[] parts = line.split(",");
            Person person = new Person();
            person.setName(parts[0]);
            person.setAge("2");
            Address address = new Address("HomeAdd","141H");
            person.setAddress(address);
            return person;
        }
    });

注意 - 我手动为所有人设置了相同的 Address.这基本上是一个嵌套的 RDD.在尝试将其保存为镶木地板文件时:

Note - I am manually setting Address the same for all. This is basically a nested RDD. On trying to save this as a parquet file:

DataFrame dfschemaPeople = sqlContext.createDataFrame(people, Person.class);
dfschemaPeople.write().parquet("/user/johndoe/spark/data/out/people.parquet");    

地址类是:

import java.io.Serializable;
public class Address implements Serializable
{
    public Address(String city, String block)
    {
        super();
        City = city;
        Block = block;
    }
    private static final long serialVersionUID = 1L;
    private String City;
    private String Block;
    //Omitting getters and setters
}

我遇到错误:

Caused by: java.lang.ClassCastException: com.test.schema.Address cannot be cast to org.apache.spark.sql.Row

Caused by: java.lang.ClassCastException: com.test.schema.Address cannot be cast to org.apache.spark.sql.Row

我正在运行 spark-1.4.1.

I am running spark-1.4.1.

  • 这是一个已知的错误吗?
  • 如果我通过导入相同格式的嵌套 JSON 文件来做同样的事情,我可以保存到 parquet.
  • 即使我创建了一个子数据帧,如:DataFrame dfSubset = sqlContext.sql("SELECT address.city FROM PersonTable");我仍然得到同样的错误
  • Is this a known bug?
  • If I do the same by importing a nested JSON file of the same format, I am able to save to parquet.
  • Even if I create a sub DataFrame like:DataFrame dfSubset = sqlContext.sql("SELECT address.city FROM PersonTable"); I still get the same error

那是什么?如何从文本文件中读取复杂的数据结构并另存为镶木地板?看来我做不到.

So what gives? How can I read a complex data structure from a text file and save as parquet? Seems I cannot do so.

推荐答案

您使用的 java api 有限制

You are using java api that has limitation

来自火花文档:http://spark.apache.org/docs/1.4.1/sql-programming-guide.html#interoperating-with-rdds

Spark SQL 支持自动将 JavaBeans 的 RDD 转换为 DataFrame.使用反射获得的 BeanInfo 定义了表的模式.目前,Spark SQL 不支持包含嵌套或包含复杂类型(如列表或数组)的 JavaBean.您可以通过创建一个实现 Serializable 并为其所有字段具有 getter 和 setter 的类来创建 JavaBean.使用 Scala 案例类它将起作用(更新为写入镶木地板格式)

Spark SQL supports automatically converting an RDD of JavaBeans into a DataFrame. The BeanInfo, obtained using reflection, defines the schema of the table. Currently, Spark SQL does not support JavaBeans that contain nested or contain complex types such as Lists or Arrays. You can create a JavaBean by creating a class that implements Serializable and has getters and setters for all of its fields. with scala case classes it will work(updated to write to parquet format)

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

case class Address(city:String, block:String);
case class Person(name:String,age:String, address:Address);
object Test2 {
  def main(args: Array[String]): Unit = {

     val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
      val sc = new SparkContext(conf)
      val sqlContext = new org.apache.spark.sql.SQLContext(sc);
      import sqlContext.implicits._
      val people = sc.parallelize(List(Person("a", "b", Address("a", "b")), Person("c", "d", Address("c", "d"))));

      val df  = sqlContext.createDataFrame(people);
      df.write.mode("overwrite").parquet("/tmp/people.parquet")
  }
}

这篇关于Spark SQL:嵌套类到镶木地板错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆