Elasticsearch + Spark:使用自定义文档_id编写json [英] Elasticsearch + Spark: write json with custom document _id

查看:337
本文介绍了Elasticsearch + Spark:使用自定义文档_id编写json的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在Spark中的Elasticsearch中编写对象的集合.我必须满足两个要求:

I am trying to write a collection of objects in Elasticsearch from Spark. I have to meet two requirements:

  1. 文档已使用JSON序列化,应原样编写
  2. 应提供Elasticsearch文档 _id

这是我到目前为止尝试过的.

Here's what I tried so far.

我尝试使用 saveJsonToEs() 像这样(序列化的文档包含具有所需Elasticsearch ID的字段 _id ):

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "_id"),
  ("es.mapping.exclude", "_id")
)

EsSpark.saveJsonToEs(rdd, cfg)

但是 elasticsearch-hadoop 库给出了以下例外:

But the elasticsearch-hadoop library gives this exception:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
    at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
    at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)

如果我删除了 es.mapping.exclude 但保留了 es.mapping.id 并发送了内部带有 _id 的JSON(如> {"_ id":等等",...} )

If I remove es.mapping.exclude but keep es.mapping.id and send a JSON with _id inside (like {"_id":"blah",...})

val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "_id")
)

EsSpark.saveJsonToEs(rdd, cfg)

我收到此错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 84.0 failed 4 times, most recent failure: Lost task 15.3 in stage 84.0 (TID 628, 172.31.35.69, executor 1): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [172.31.30.184:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
...

当我尝试将此ID作为其他字段发送时(例如 {"superID":"blah",...":

When I try to send this id as a different field (like {"superID":"blah",...":

 val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "superID")
)

EsSpark.saveJsonToEs(rdd, cfg)

它无法提取字段:

17/12/20 15:15:38 WARN TaskSetManager: Lost task 8.0 in stage 84.0 (TID 586, 172.31.33.56, executor 0): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
    at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:106)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:161)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

当我从配置中删除 es.mapping.id es.mapping.exclude 时,它可以工作,但是文档ID是由Elasticsearch生成的(违反了要求2):

When I remove es.mapping.id and es.mapping.exclude from the configuration, it works but the document id is generated by Elasticsearch (which violates requirement 2):

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
)

EsSpark.saveJsonToEs(rdd, cfg)

saveToEsWithMeta()

还有另一个功能可以提供 _id 和其他元数据: saveToEsWithMeta() 可以解决要求2,但不能满足要求1.

saveToEsWithMeta()

There is another function to provide _id and other metadata for inserting: saveToEsWithMeta() that allows to solve requirement 2 but fails with requirement 1.

val rdd: RDD[(String, String)] = job.map{
  r => r._id -> r.toJson()
}

val cfg = Map(
  ("es.resource", "myindex/mytype"),
)

EsSpark.saveToEsWithMeta(rdd, cfg)

实际上,Elasticsearch甚至无法解析 elasticsearch-hadoop 发送的内容:

In fact, Elasticsearch is not even able to parse what the elasticsearch-hadoop sends:

Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)

问题

是否可以将Spark中的(documentID,serializedDocument)集合写入Elasticsearch(使用 elasticsearch-hadoop )?

The question

Is it possible to write a collection of (documentID, serializedDocument) from Spark into Elasticsearch (using elasticsearch-hadoop)?

P.S.我正在使用Elasticsearch 5.6.3和Spark 2.1.1.

P.S. I am using Elasticsearch 5.6.3 and Spark 2.1.1.

推荐答案

最后,我发现了问题所在:这是配置中的错字.

Finally I found the problem: it was a typo in the config.

[JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]

它正在寻找一个字段 superID ,但是只有 superID (请注意情况).在这个问题上,它也有点误导,因为在代码中,它看起来像"es.mapping.id","superID" (这是不正确的).

It was looking for a field superID but there was only superID (note the case). In the question it is also a bit misleading since in the code it appears like "es.mapping.id", "superID" (which was not correct).

实际的解决方案类似于 Levi Ramsey 建议:

The actual solution is like Levi Ramsey suggested:

val json = """{"foo":"bar","superID":"deadbeef"}"""

val rdd = spark.makeRDD(Seq(json))
val cfg = Map(
  ("es.mapping.id", "superID"),
  ("es.resource", "myindex/mytype")
)
EsSpark.saveJsonToEs(rdd, cfg = cfg)

区别在于, es.mapping.id 不能为 _id (如原始帖子中所述, _id 是元数据,Elasticsearch不接受.)

The difference is that es.mapping.id cannot be _id (as was indicated in the original post, _id is the metadata and Elasticsearch does not accept it).

自然,这意味着应将新字段 superID 添加到映射中(除非映射是动态的).如果在索引中存储其他字段是一种负担,则还应该:

Naturally it means that the new field superID should be added to the mapping (unless the mapping is dynamic). If storing additional field in the index is a burden, one should also:

  • 排除来自映射
  • 并禁用其索引

非常感谢 Alex Savitsky 指向正确的方向.

Thanks a lot to Alex Savitsky for pointing to the correct direction.

这篇关于Elasticsearch + Spark:使用自定义文档_id编写json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆