Elasticsearch + Spark:使用自定义文档_id编写json [英] Elasticsearch + Spark: write json with custom document _id
问题描述
我试图在Spark中的Elasticsearch中编写对象的集合.我必须满足两个要求:
I am trying to write a collection of objects in Elasticsearch from Spark. I have to meet two requirements:
- 文档已使用JSON序列化,应原样编写
- 应提供Elasticsearch文档
_id
这是我到目前为止尝试过的.
Here's what I tried so far.
我尝试使用 saveJsonToEs()
像这样(序列化的文档包含具有所需Elasticsearch ID的字段 _id
):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id"),
("es.mapping.exclude", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
但是 elasticsearch-hadoop
库给出了以下例外:
But the elasticsearch-hadoop
library gives this exception:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)
如果我删除了 es.mapping.exclude
但保留了 es.mapping.id
并发送了内部带有 _id
的JSON(如> {"_ id":等等",...}
)
If I remove es.mapping.exclude
but keep es.mapping.id
and send a JSON with _id
inside (like {"_id":"blah",...}
)
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
我收到此错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 84.0 failed 4 times, most recent failure: Lost task 15.3 in stage 84.0 (TID 628, 172.31.35.69, executor 1): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [172.31.30.184:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
...
当我尝试将此ID作为其他字段发送时(例如 {"superID":"blah",..."
:
When I try to send this id as a different field (like {"superID":"blah",..."
:
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "superID")
)
EsSpark.saveJsonToEs(rdd, cfg)
它无法提取字段:
17/12/20 15:15:38 WARN TaskSetManager: Lost task 8.0 in stage 84.0 (TID 586, 172.31.33.56, executor 0): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:106)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:161)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
当我从配置中删除 es.mapping.id
和 es.mapping.exclude
时,它可以工作,但是文档ID是由Elasticsearch生成的(违反了要求2):
When I remove es.mapping.id
and es.mapping.exclude
from the configuration, it works but the document id is generated by Elasticsearch (which violates requirement 2):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveJsonToEs(rdd, cfg)
saveToEsWithMeta()
还有另一个功能可以提供 _id
和其他元数据: saveToEsWithMeta()
可以解决要求2,但不能满足要求1.
saveToEsWithMeta()
There is another function to provide _id
and other metadata for inserting: saveToEsWithMeta()
that allows to solve requirement 2 but fails with requirement 1.
val rdd: RDD[(String, String)] = job.map{
r => r._id -> r.toJson()
}
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveToEsWithMeta(rdd, cfg)
实际上,Elasticsearch甚至无法解析 elasticsearch-hadoop
发送的内容:
In fact, Elasticsearch is not even able to parse what the elasticsearch-hadoop
sends:
Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
问题
是否可以将Spark中的(documentID,serializedDocument)
集合写入Elasticsearch(使用 elasticsearch-hadoop
)?
The question
Is it possible to write a collection of (documentID, serializedDocument)
from Spark into Elasticsearch (using elasticsearch-hadoop
)?
P.S.我正在使用Elasticsearch 5.6.3和Spark 2.1.1.
P.S. I am using Elasticsearch 5.6.3 and Spark 2.1.1.
推荐答案
最后,我发现了问题所在:这是配置中的错字.
Finally I found the problem: it was a typo in the config.
[JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
它正在寻找一个字段 superID
,但是只有 superID
(请注意情况).在这个问题上,它也有点误导,因为在代码中,它看起来像"es.mapping.id","superID"
(这是不正确的).
It was looking for a field superID
but there was only superID
(note the case). In the question it is also a bit misleading since in the code it appears like "es.mapping.id", "superID"
(which was not correct).
实际的解决方案类似于 Levi Ramsey 建议:
The actual solution is like Levi Ramsey suggested:
val json = """{"foo":"bar","superID":"deadbeef"}"""
val rdd = spark.makeRDD(Seq(json))
val cfg = Map(
("es.mapping.id", "superID"),
("es.resource", "myindex/mytype")
)
EsSpark.saveJsonToEs(rdd, cfg = cfg)
区别在于, es.mapping.id
不能为 _id
(如原始帖子中所述, _id
是元数据,Elasticsearch不接受.)
The difference is that es.mapping.id
cannot be _id
(as was indicated in the original post, _id
is the metadata and Elasticsearch does not accept it).
自然,这意味着应将新字段 superID
添加到映射中(除非映射是动态的).如果在索引中存储其他字段是一种负担,则还应该:
Naturally it means that the new field superID
should be added to the mapping (unless the mapping is dynamic). If storing additional field in the index is a burden, one should also:
- 排除来自映射
- 并禁用其索引
非常感谢 Alex Savitsky 指向正确的方向.
Thanks a lot to Alex Savitsky for pointing to the correct direction.
这篇关于Elasticsearch + Spark:使用自定义文档_id编写json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!