将 Spark Dataframe 保存到 Elasticsearch - 无法处理类型异常 [英] Save Spark Dataframe into Elasticsearch - Can’t handle type exception

查看:29
本文介绍了将 Spark Dataframe 保存到 Elasticsearch - 无法处理类型异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我设计了一个简单的作业来从 MySQL 读取数据并将其保存在 Elasticsearch 中.

I have designed a simple job to read data from MySQL and save it in Elasticsearch with Spark.

代码如下:

JavaSparkContext sc = new JavaSparkContext(
        new SparkConf().setAppName("MySQLtoEs")
                .set("es.index.auto.create", "true")
                .set("es.nodes", "127.0.0.1:9200")
                .set("es.mapping.id", "id")
                .set("spark.serializer", KryoSerializer.class.getName()));

SQLContext sqlContext = new SQLContext(sc);

// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");

// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
        "merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");

你可以看到代码非常简单.它将数据读入 DataFrame,选择一些列,然后执行 count 作为对 Dataframe 的基本操作.到目前为止一切正常.

You can see the code is very straightforward. It reads the data into a DataFrame, selects some columns and then performs a count as a basic action on the Dataframe. Everything works fine up to this point.

然后它尝试将数据保存到 Elasticsearch 中,但由于无法处理某些类型而失败.您可以在此处查看错误日志.

Then it tries to save the data into Elasticsearch, but it fails because it cannot handle some type. You can see the error log here.

我不确定为什么它不能处理这种类型.有人知道为什么会这样吗?

I'm not sure about why it can't handle that type. Does anyone know why this is occurring?

我使用的是 Apache Spark 1.5.0、Elasticsearch 1.4.4 和 Elaticsearch-hadoop 2.1.1

I'm using Apache Spark 1.5.0, Elasticsearch 1.4.4 and elaticsearch-hadoop 2.1.1

  • 我已使用示例数据集和源代码更新了要点链接.
  • 我也尝试过使用 elasticsearch-hadoop dev builds 正如@costin 在邮件列表中提到的那样.
  • I have updated the gist link with a sample dataset along with the source code.
  • I have also tried to use the elasticsearch-hadoop dev builds as mentionned by @costin on the mailing list.

推荐答案

这个问题的答案很棘手,但感谢 samklr,我已经设法弄清楚问题是什么.

The answer for this one was tricky, but thanks to samklr, I have managed to figure about what the problem was.

然而,解决方案并不简单,可能会考虑一些不必要的"转换.

The solution isn't straightforward nevertheless and might consider some "unnecessary" transformations.

首先让我们谈谈序列化.

Spark 数据序列化和函数序列化需要考虑序列化两个方面.在这种情况下,它是关于数据序列化和反序列化.

There are two aspects of serialization to consider in Spark serialization of data and serialization of functions. In this case, it's about data serialization and thus de-serialization.

从 Spark 的角度来看,唯一需要的是设置序列化 - Spark 默认依赖 Java 序列化,这很方便但效率相当低.这就是Hadoop本身引入自己的序列化机制和自己的类型——即Writables的原因.因此,InputFormatOutputFormats 需要返回 Writables,Spark 无法理解.

From Spark’s perspective, the only thing required is setting up serialization - Spark relies by default on Java serialization which is convenient but fairly inefficient. This is the reason why Hadoop itself introduced its own serialization mechanism and its own types - namely Writables. As such, InputFormat and OutputFormats are required to return Writables which, out of the box, Spark does not understand.

使用 elasticsearch-spark 连接器时,必须启用不同的序列化 (Kryo),它可以自动处理转换并且非常有效地执行此操作.

With the elasticsearch-spark connector one must enable a different serialization (Kryo) which handles the conversion automatically and also does this quite efficiently.

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

即使 Kryo 不要求类实现要序列化的特定接口,这意味着 POJO 可以在 RDD 中使用,而无需启用 Kryo 序列化之外的任何进一步工作.

Even since Kryo does not require that a class implement a particular interface to be serialized, which means POJOs can be used in RDDs without any further work beyond enabling Kryo serialization.

也就是说,@samklr 向我指出 Kryo 需要在使用类之前注册它们.

That said, @samklr pointed out to me that Kryo needs to register classes before using them.

这是因为 Kryo 写入了对正在序列化的对象的类的引用(为每个写入的对象写入一个引用),如果该类已注册,则它只是一个整数标识符,否则是完整的类名.Spark 代表您注册 Scala 类和许多其他框架类(如 Avro Generic 或 Thrift 类).

This is because Kryo writes a reference to the class of the object being serialized (one reference is written for every object written), which is just an integer identifier if the class has been registered but is the full classname otherwise. Spark registers Scala classes and many other framework classes (like Avro Generic or Thrift classes) on your behalf.

使用 Kryo 注册课程非常简单.创建 KryoRegistrator 的子类,并覆盖 registerClasses() 方法:

Registering classes with Kryo is straightforward. Create a subclass of KryoRegistrator,and override the registerClasses() method:

public class MyKryoRegistrator implements KryoRegistrator, Serializable {
    @Override
    public void registerClasses(Kryo kryo) {
        // Product POJO associated to a product Row from the DataFrame            
        kryo.register(Product.class); 
    }
}

最后,在您的驱动程序中,将 spark.kryo.registrator 属性设置为您的 KryoRegistrator 实现的完全限定类名:

Finally, in your driver program, set the spark.kryo.registrator property to the fully qualified classname of your KryoRegistrator implementation:

conf.set("spark.kryo.registrator", "MyKryoRegistrator")

其次,即使认为 Kryo 序列化器已设置并注册了类,对 Spark 1.5 进行了更改,但出于某种原因,Elasticsearch 无法反序列化数据帧,因为它无法推断数据帧的 SchemaType 到连接器中.

Secondly, even thought the Kryo serializer is set and the class registered, with changes made to Spark 1.5, and for some reason Elasticsearch couldn't de-serialize the Dataframe because it can't infer the SchemaType of the Dataframe into the connector.

所以我不得不将 Dataframe 转换为 JavaRDD

So I had to convert the Dataframe to an JavaRDD

JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
    public Product call(Row row) throws Exception {
        long id = row.getLong(0);
        String title = row.getString(1);
        String description = row.getString(2);
        int merchantId = row.getInt(3);
        double price = row.getDecimal(4).doubleValue();
        String keywords = row.getString(5);
        long brandId = row.getLong(6);
        int categoryId = row.getInt(7);
        return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
    }
});

现在数据已准备好写入elasticsearch:

Now the data is ready to be written into elasticsearch :

JavaEsSpark.saveToEs(products, "test/test");

参考:

  • Elasticsearch 的 Apache Spark 支持文档.立>
  • Hadoop 权威指南,第 19 章.Spark,编辑.4 - 汤姆怀特.
  • 用户 samklr.

这篇关于将 Spark Dataframe 保存到 Elasticsearch - 无法处理类型异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆