ElasticSearch星火RDD [英] ElasticSearch to Spark RDD

查看:396
本文介绍了ElasticSearch星火RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是测试ElasticSearch和星火我的本地机器上的整合,利用elasticsearch加载一些测试数据。

I was testing ElasticSearch and Spark integration on my local machine, using some test data loaded in elasticsearch.

val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")

val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
      classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()

在code运行良好,并成功返回正确的结果
esRDD.first()

The code runs fine and returns the correct result successfully with esRDD.first()

不过,esRDD.collect()会产生异常:

However, esRDD.collect() will generate exception:

java.io.NotSerializableException: org.apache.hadoop.io.Text
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我相信这是与这里提到的问题, HTTP: //www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
所以我相应增加这行

I believe this is related to the issue mentioned here http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html so I added this line accordingly

conf.set("spark.serializer", classOf[KryoSerializer].getName)

我应该做些别的事情来得到它的工作?
谢谢

Am I supposed to do something else to get it working? Thank you

更新:
在serialziation设置问题得到解决。通过使用

Updates: the serialziation setup problem was solved. by using

sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)

而不是

conf.set("spark.serializer", classOf[KryoSerializer].getName)

现在有另外一个
有1000截然不同记录此数据集

Now there is another one There are 1000 distinct records in this dataset

esRDD.count()

返回1000没有问题,但是

returns 1000 no problem, however

esRDD.distinct().count()

5的回报!如果我打印的记录

returns 5 ! If I print the records

esRDD.foreach(println)

据打印出的1000条记录正确。但是,如果使用收集或者采取

It prints out the 1000 records correctly. But if I use collect or take

esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)

将打印重复记录,也的确显示了只有5个唯一的记录,这似乎是整个数据集的随机子集 - 这不是第5条记录。
如果我救RDD并重新读取

it will print DUPLICATED records, and there is indeed only 5 UNIQUE records shown up, which seems to be a random subset of the entire dataset - it's not the first 5 records. If I save the RDD and read it back

esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)

esRDD2行为与预期相同。我不知道是否有一个bug,或者说我不理解的收集行为/服用。还是因为我在本地运行一切。
默认火花RDD似乎使用5个分区,如图中的火花输出文件的一部分-XXXX文件的数量。这可能是为什么esRDD.collect()和esRDD.distinct()返回5独特的记录,而不是其他一些随机数。但是,这仍然是不正确的。

esRDD2 behaves as expected. I wonder if there is a bug, or something I don't understand about the behavior of collect/take. Or is it because I'm running everything locally. By default Spark RDD seems to use 5 partitions, as shown in the number of part-xxxx files of the "spark-output" file. That's probably why esRDD.collect() and esRDD.distinct() returned 5 unique records, instead of some other random number. But that's still not right.

推荐答案

您应使用以下codeS初始化:

You should use the following codes to initialize:

val sparkConf = new SparkConf().setAppName("Test").setMaster("local").set("spark.serializer", classOf[KryoSerializer].getName)
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")

这篇关于ElasticSearch星火RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆