如何在Spark中使用ElasticSearch在脚本文档中更新或部分更新? [英] How to upsert or partial updates with script documents in ElasticSearch with Spark?

查看:879
本文介绍了如何在Spark中使用ElasticSearch在脚本文档中更新或部分更新?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在python中有一个伪代码,可以从Kafka流中读取并在Elasticsearch中增加文档(如果文档已经存在,则增加一个计数器 view

I have a pseudocode in python that reads from a Kafka stream and upsert documents in Elasticsearch (incrementing a counter view if the document exists already.

for message in consumer:

    msg = json.loads(message.value)
    print(msg)
    index = INDEX_NAME
    es_id = msg["id"]
    script = {"script":"ctx._source.view+=1","upsert" : msg}
    es.update(index=index, doc_type="test", id=es_id, body=script)

由于我想在分布式环境中使用它,因此我使用Spark结构化流

Since I want to use it in a distributed environment, I am using Spark Structured Streaming

df.writeStream \
.format("org.elasticsearch.spark.sql")\
.queryName("ESquery")\
.option("es.resource","credentials/url") \
.option("checkpointLocation", "checkpoint").start()

或SparkStreaming in从KafkaStream读取的Scala:

or SparkStreaming in scala that reads from KafkaStream:

// Initializing Spark Streaming Context and kafka stream
sparkConf.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
[...] 
val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicsSet, kafkaParams)
    )

[...]
val urls = messages.map(record => JsonParser.parse(record.value()).values.asInstanceOf[Map[String, Any]])
urls.saveToEs("credentials/credential")

.saveToEs(...) elastic-hadoop的API。 jar 记录在此处 。不幸的是,此存储库的确没有得到很好的记录。因此,我不知道可以将脚本命令放在哪里。

.saveToEs(...) is the API of elastic-hadoop.jar documented here. Unfortunately this repo is not really well documented. So I cannot understand where I can put the script command.

有人可以帮助我吗?预先谢谢您

Is there anyone can help me? Thank you in advance

推荐答案

您应该能够通过设置写入模式 update(或upsert)并传递您的脚本为脚本(取决于ES版本)。

You should be able to do it by setting write mode "update" ( or upsert) and passing your script as "script" (depends on ES version).

EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id", "es.write.operation" -> "update","es.update.script.inline" -> "your script" , ))

可能您想使用 upsert

Probably you want to use "upsert"

那里有一些很好的级联集成中的单元测试在同一库中;这些设置应该都适用于火花,因为它们都使用相同的编写器。

There are some good unit tests in cascading integration in same library; These settings should be good for spark as both uses same writer.

我建议阅读单元测试以为您的ES版本选择正确的设置。

I suggest to read unit tests to pick correct settings for your ES version.

这篇关于如何在Spark中使用ElasticSearch在脚本文档中更新或部分更新?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆