将Spark Dataframe保存到Elasticsearch中 - 无法处理类型异常 [英] Save Spark Dataframe into Elasticsearch - Can’t handle type exception

查看:4076
本文介绍了将Spark Dataframe保存到Elasticsearch中 - 无法处理类型异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我设计了一个简单的工作来从MySQL读取数据并将其保存在具有Spark的Elasticsearch中。

I have designed a simple job to read data from MySQL and save it in Elasticsearch with Spark.

这是代码:

JavaSparkContext sc = new JavaSparkContext(
        new SparkConf().setAppName("MySQLtoEs")
                .set("es.index.auto.create", "true")
                .set("es.nodes", "127.0.0.1:9200")
                .set("es.mapping.id", "id")
                .set("spark.serializer", KryoSerializer.class.getName()));

SQLContext sqlContext = new SQLContext(sc);

// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");

// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
        "merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");

您可以看到代码非常简单。它将数据读入DataFrame,选择一些列,然后执行计数作为Dataframe上的基本操作。一切都可以正常工作。

You can see the code is very straightforward. It reads the data into a DataFrame, selects some columns and then performs a count as a basic action on the Dataframe. Everything works fine up to this point.

然后,它尝试将数据保存到Elasticsearch中,但由于无法处理某些类型,因此失败。您可以在此处查看错误日志。

Then it tries to save the data into Elasticsearch, but it fails because it cannot handle some type. You can see the error log here.

我不知道为什么它不能处理这种类型。

I'm not sure about why it can't handle that type. Does anyone know why this is occurring?

我正在使用Apache Spark 1.5.0,Elasticsearch 1.4.4和elaticsearch-hadoop 2.1。 1

I'm using Apache Spark 1.5.0, Elasticsearch 1.4.4 and elaticsearch-hadoop 2.1.1

编辑:


  • 与一个样本数据集以及源代码的关系链接。

  • 我还试图使用elasticsearch-hadoop dev builds

  • I have updated the gist link with a sample dataset along with the source code.
  • I have also tried to use the elasticsearch-hadoop dev builds as mentionned by @costin on the mailing list.

推荐答案

这个答案很棘手,但感谢 samklr ,我已经设法弄清楚问题是什么。

The answer for this one was tricky, but thanks to samklr, I have managed to figure about what the problem was.

但解决方案并不直接,可能会考虑一些不必要的转换。

The solution isn't straightforward nevertheless and might consider some "unnecessary" transformations.

首先让我们来谈谈序列化

Spark的串行化数据和序列化功能有两个要考虑的序列化方面。在这种情况下,它是关于数据序列化,从而解除序列化。

There are two aspects of serialization to consider in Spark serialization of data and serialization of functions. In this case, it's about data serialization and thus de-serialization.

从Spark的角度来看,唯一需要的是设置序列化 - Spark依赖于Java序列化,这是方便但相当低效的。这就是为什么Hadoop本身引入自己的序列化机制和自己的类型 - 即 Writables 的原因。因此, InputFormat OutputFormats 需要返回 Writables 这是开箱即用的,Spark不明白。

From Spark’s perspective, the only thing required is setting up serialization - Spark relies by default on Java serialization which is convenient but fairly inefficient. This is the reason why Hadoop itself introduced its own serialization mechanism and its own types - namely Writables. As such, InputFormat and OutputFormats are required to return Writables which, out of the box, Spark does not understand.

使用elasticsearch-spark连接器,必须启用一个不同的序列化(Kryo),它可以自动处理转换这很有效率。

With the elasticsearch-spark connector one must enable a different serialization (Kryo) which handles the conversion automatically and also does this quite efficiently.

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

即使Kryo不要求类实现特定的接口序列化,这意味着POJO可以在RDD中使用,无需任何进一步的工作,而不需要启用Kryo序列化。

Even since Kryo does not require that a class implement a particular interface to be serialized, which means POJOs can be used in RDDs without any further work beyond enabling Kryo serialization.

这就是说,@samklr向我指出,Kryo需要注册类之前使用它们。

That said, @samklr pointed out to me that Kryo needs to register classes before using them.

这是因为Kryo写入一个对被序列化的对象的引用(一个引用是为每个写入的对象编写的),它只是一个整数标识符,如果该类已经注册了,但是是完整的类别。 Spark代表您注册Scala类和许多其他框架类(如Avro Generic或Thrift类)。

This is because Kryo writes a reference to the class of the object being serialized (one reference is written for every object written), which is just an integer identifier if the class has been registered but is the full classname otherwise. Spark registers Scala classes and many other framework classes (like Avro Generic or Thrift classes) on your behalf.

使用Kryo注册课程很简单。创建KryoRegistrator的子类,并覆盖 registerClasses()方法:

Registering classes with Kryo is straightforward. Create a subclass of KryoRegistrator,and override the registerClasses() method:

public class MyKryoRegistrator implements KryoRegistrator, Serializable {
    @Override
    public void registerClasses(Kryo kryo) {
        // Product POJO associated to a product Row from the DataFrame            
        kryo.register(Product.class); 
    }
}

最后,在您的驱动程序中设置火花。 kryo.registrator属性到您的KryoRegistrator实现的完全限定类名:

Finally, in your driver program, set the spark.kryo.registrator property to the fully qualified classname of your KryoRegistrator implementation:

conf.set("spark.kryo.registrator", "MyKryoRegistrator")

其次,甚至认为Kryo serializer已经设置,该类注册,对Spark 1.5进行了更改,由于某种原因,Elasticsearch无法取消序列化数据框,因为它不能推断出 SchemaType 数据帧插入到连接器中。

Secondly, even thought the Kryo serializer is set and the class registered, with changes made to Spark 1.5, and for some reason Elasticsearch couldn't de-serialize the Dataframe because it can't infer the SchemaType of the Dataframe into the connector.

所以我不得不将Dataframe转换为JavaRDD

So I had to convert the Dataframe to an JavaRDD

JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
    public Product call(Row row) throws Exception {
        long id = row.getLong(0);
        String title = row.getString(1);
        String description = row.getString(2);
        int merchantId = row.getInt(3);
        double price = row.getDecimal(4).doubleValue();
        String keywords = row.getString(5);
        long brandId = row.getLong(6);
        int categoryId = row.getInt(7);
        return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
    }
});

现在数据可以写入弹性搜索:

Now the data is ready to be written into elasticsearch :

JavaEsSpark.saveToEs(products, "test/test");

参考资料:

  • Elasticsearch's Apache Spark support documentation.
  • Hadoop Definitive Guide, Chapter 19. Spark, ed. 4 – Tom White.
  • User samklr.

这篇关于将Spark Dataframe保存到Elasticsearch中 - 无法处理类型异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆