星火保存到数据框Elasticsearch - 无法处理的异常类型 [英] Save Spark Dataframe into Elasticsearch - Can’t handle type exception

查看:535
本文介绍了星火保存到数据框Elasticsearch - 无法处理的异常类型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我设计了一个简单的工作,从MySQL中读取数据,并将其与星火保存Elasticsearch。

下面是code:

  JavaSparkContext SC =新JavaSparkContext(
        新SparkConf()。setAppName(MySQLtoEs)
                .SET(es.index.auto.create,真)
                .SET(es.nodes,127.0.0.1:9200)
                .SET(es.mapping.id,ID)
                .SET(spark.serializer,KryoSerializer.class.getName()));SQLContext sqlContext =新SQLContext(SC);//数据源选项
地图<字符串,字符串>选项​​=新的HashMap<>();
options.put(驱动程序,MYSQL_DRIVER);
options.put(URL,MYSQL_CONNECTION_URL);
options.put(DBTABLE,优惠);
options.put(partitionColumn,ID);
options.put(下界,10001);
options.put(UPPERBOUND,499999);
options.put(numPartitions,10);//加载MySQL查询结果作为数据框
LOGGER.info(加载数据框);
数据帧jdbcDF = sqlContext.load(JDBC选项);
数据帧DF = jdbcDF.select(ID,标题,说明,
        MERCHANTID,价格,关键字,brandId,的categoryId);
df.show();
LOGGER.info(df.count:+ df.count());
EsSparkSQL.saveToEs(DF,商情/产品);

您可以看到code是非常简单的。它的数据读入一个数据帧,选择一些列,然后执行计数作为数据框一个基本动作。一切正常了这一点。

然后,它会尝试将数据保存到Elasticsearch,而是因为它不能处理某些类型的失败。你可以看到错误日志这里

我不知道为什么它不能处理该类型。的有谁知道这是为什么发生?

我使用Apache 1.5.0星火,Elasticsearch 1.4.4和elaticsearch-的Hadoop 2.1.1

编辑:


  • 我已经与源$ C ​​$ C一起更新了一个样本数据集的要点链接。

  • 我也尝试使用elasticsearch-的Hadoop的开发建立通过@costin邮件列表作为mentionned。


解决方案

这一个答案是非常棘手,但由于 samklr ,我已成功地弄清楚有关的问题是什么。

该解决方案是不是仍然简单,可以考虑一些不必要的转变。<​​/ P>

首先,让我们谈谈连载

有序列化两个方面的数据和功能系列化星火系列化考虑。在这种情况下,它是关于数据序列化,因此反序列化。

从星火的角度来看,唯一需要的是建立系列化 - 星火依靠默认Java序列化,这是方便,但效率非常低。这就是为什么Hadoop的本身推出了自己的序列化机制和它自己的类型的原因 - 即 Writables 。因此,的InputFormat OutputFormats 必须返回 Writables 其中,开箱,星火不理解。

随着elasticsearch火花连接器必须启用不同的序列化(KRYO),它自动处理转换,也做到这一点非常有效。

<$c$c>conf.set(\"spark.serializer\",\"org.apache.spark.serializer.KryoSerializer\")

即使因为KRYO不要求一类实现一个特定的接口被序列化,这意味着的POJO可以RDDS使用而不超出使KRYO系列化任何进一步的工作。

这是说,@samklr指出,我认为需要KRYO使用它们之前登记类。

这是因为KRYO写入类对象的引用被序列化(一个参考书面每个对象的书面),这仅仅是如果类已经注册的整数标识符,但完整的类名,否则。星火您的名义注册Scala类和许多其他框架类(如通用的Avro或储蓄类)。

注册类与KRYO很简单。创建KryoRegistrator的子类,并覆盖 registerClasses()方法:

公共类MyKryoRegistrator实现KryoRegistrator,序列化{
    @覆盖
    公共无效registerClasses(KRYO KRYO){
        //从数据框与产品相关联排产品POJO
        kryo.register(Product.class);
    }
}

最后,在你的驱动程序中,spark.kryo.registrator属性设置为您KryoRegistrator实现的完全限定类名:

conf.set(spark.kryo.registrator,MyKryoRegistrator)

第二,即使认为KRYO串行器设置以及类注册,并作出星火1.5的变化,以及由于某种原因Elasticsearch不能反序列化数据框,因为它无法推断在的SchemaType 数据框插入连接器。

于是,我只好数据框转换为JavaRDD

JavaRDD&LT;产品与GT; 。产品= df.javaRDD()地图(新功能与LT;行,产品&GT;(){
    公共产品调用(鳞次栉比)抛出异常{
        长的id = row.getLong(0);
        字符串title = row.getString(1);
        字符串描述= row.getString(2);
        INT MERCHANTID = row.getInt(3);
        双价= row.getDecimal(4).doubleValue();
        字符串关键字= row.getString(5);
        长brandId = row.getLong(6);
        INT的categoryId = row.getInt(7);
        返回新产品(ID,标题,描述,MERCHANTID,价格,关键字,brandId,的categoryId);
    }
});

现在的数据准备好被写入elasticsearch:

JavaEsSpark.saveToEs(产品,测试/测试);

参考文献:


  • Elasticsearch的阿帕奇星火支持文档

  • Hadoop的权威指南,第19章星火,编辑。 4 - 汤姆·怀特

  • samklr 。

I have designed a simple job to read data from MySQL and save it in Elasticsearch with Spark.

Here is the code:

JavaSparkContext sc = new JavaSparkContext(
        new SparkConf().setAppName("MySQLtoEs")
                .set("es.index.auto.create", "true")
                .set("es.nodes", "127.0.0.1:9200")
                .set("es.mapping.id", "id")
                .set("spark.serializer", KryoSerializer.class.getName()));

SQLContext sqlContext = new SQLContext(sc);

// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");

// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
        "merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");

You can see the code is very straightforward. It reads the data into a DataFrame, selects some columns and then performs a count as a basic action on the Dataframe. Everything works fine up to this point.

Then it tries to save the data into Elasticsearch, but it fails because it cannot handle some type. You can see the error log here.

I'm not sure about why it can't handle that type. Does anyone know why this is occurring?

I'm using Apache Spark 1.5.0, Elasticsearch 1.4.4 and elaticsearch-hadoop 2.1.1

EDIT:

  • I have updated the gist link with a sample dataset along with the source code.
  • I have also tried to use the elasticsearch-hadoop dev builds as mentionned by @costin on the mailing list.

解决方案

The answer for this one was tricky, but thanks to samklr, I have managed to figure about what the problem was.

The solution isn't straightforward nevertheless and might consider some "unnecessary" transformations.

First let's talk about Serialization.

There are two aspects of serialization to consider in Spark serialization of data and serialization of functions. In this case, it's about data serialization and thus de-serialization.

From Spark’s perspective, the only thing required is setting up serialization - Spark relies by default on Java serialization which is convenient but fairly inefficient. This is the reason why Hadoop itself introduced its own serialization mechanism and its own types - namely Writables. As such, InputFormat and OutputFormats are required to return Writables which, out of the box, Spark does not understand.

With the elasticsearch-spark connector one must enable a different serialization (Kryo) which handles the conversion automatically and also does this quite efficiently.

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

Even since Kryo does not require that a class implement a particular interface to be serialized, which means POJOs can be used in RDDs without any further work beyond enabling Kryo serialization.

That said, @samklr pointed out to me that Kryo needs to register classes before using them.

This is because Kryo writes a reference to the class of the object being serialized (one reference is written for every object written), which is just an integer identifier if the class has been registered but is the full classname otherwise. Spark registers Scala classes and many other framework classes (like Avro Generic or Thrift classes) on your behalf.

Registering classes with Kryo is straightforward. Create a subclass of KryoRegistrator,and override the registerClasses() method:

public class MyKryoRegistrator implements KryoRegistrator, Serializable {
    @Override
    public void registerClasses(Kryo kryo) {
        // Product POJO associated to a product Row from the DataFrame            
        kryo.register(Product.class); 
    }
}

Finally, in your driver program, set the spark.kryo.registrator property to the fully qualified classname of your KryoRegistrator implementation:

conf.set("spark.kryo.registrator", "MyKryoRegistrator")

Secondly, even thought the Kryo serializer is set and the class registered, with changes made to Spark 1.5, and for some reason Elasticsearch couldn't de-serialize the Dataframe because it can't infer the SchemaType of the Dataframe into the connector.

So I had to convert the Dataframe to an JavaRDD

JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
    public Product call(Row row) throws Exception {
        long id = row.getLong(0);
        String title = row.getString(1);
        String description = row.getString(2);
        int merchantId = row.getInt(3);
        double price = row.getDecimal(4).doubleValue();
        String keywords = row.getString(5);
        long brandId = row.getLong(6);
        int categoryId = row.getInt(7);
        return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
    }
});

Now the data is ready to be written into elasticsearch :

JavaEsSpark.saveToEs(products, "test/test");

References:

  • Elasticsearch's Apache Spark support documentation.
  • Hadoop Definitive Guide, Chapter 19. Spark, ed. 4 – Tom White.
  • User samklr.

这篇关于星火保存到数据框Elasticsearch - 无法处理的异常类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆