如何在Spark中使用ElasticSearch在脚本文档中更新或部分更新? [英] How to upsert or partial updates with script documents in ElasticSearch with Spark?
问题描述
我在python中有一个伪代码,可以从Kafka流中读取并在Elasticsearch中增加文档(如果文档已经存在,则增加一个计数器 view
。
I have a pseudocode in python that reads from a Kafka stream and upsert documents in Elasticsearch (incrementing a counter view
if the document exists already.
for message in consumer:
msg = json.loads(message.value)
print(msg)
index = INDEX_NAME
es_id = msg["id"]
script = {"script":"ctx._source.view+=1","upsert" : msg}
es.update(index=index, doc_type="test", id=es_id, body=script)
由于我想在分布式环境中使用它,因此我使用Spark结构化流
Since I want to use it in a distributed environment, I am using Spark Structured Streaming
df.writeStream \
.format("org.elasticsearch.spark.sql")\
.queryName("ESquery")\
.option("es.resource","credentials/url") \
.option("checkpointLocation", "checkpoint").start()
或SparkStreaming in从KafkaStream读取的Scala:
or SparkStreaming in scala that reads from KafkaStream:
// Initializing Spark Streaming Context and kafka stream
sparkConf.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
[...]
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
)
[...]
val urls = messages.map(record => JsonParser.parse(record.value()).values.asInstanceOf[Map[String, Any]])
urls.saveToEs("credentials/credential")
.saveToEs(...)
是 elastic-hadoop的API。 jar
记录在此处 。不幸的是,此存储库的确没有得到很好的记录。因此,我不知道可以将脚本命令放在哪里。
.saveToEs(...)
is the API of elastic-hadoop.jar
documented here. Unfortunately this repo is not really well documented. So I cannot understand where I can put the script command.
有人可以帮助我吗?预先谢谢您
Is there anyone can help me? Thank you in advance
推荐答案
您应该能够通过设置写入模式 update(或upsert)并传递您的脚本为脚本(取决于ES版本)。
You should be able to do it by setting write mode "update" ( or upsert) and passing your script as "script" (depends on ES version).
EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id", "es.write.operation" -> "update","es.update.script.inline" -> "your script" , ))
可能您想使用 upsert
Probably you want to use "upsert"
那里有一些很好的级联集成中的单元测试在同一库中;这些设置应该都适用于火花,因为它们都使用相同的编写器。
There are some good unit tests in cascading integration in same library; These settings should be good for spark as both uses same writer.
我建议阅读单元测试以为您的ES版本选择正确的设置。
I suggest to read unit tests to pick correct settings for your ES version.
这篇关于如何在Spark中使用ElasticSearch在脚本文档中更新或部分更新?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!