从spark写入elasticsearch非常慢 [英] Write to elasticsearch from spark is very slow
问题描述
我正在处理一个文本文件,并从Spark应用程序将转换后的行写为波纹管
I am processing a text file and writing transformed rows from a Spark application to elastic search as bellow
input.write.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.resource", "{date}/" + dir).save()
运行速度非常慢,大约需要8分钟才能写入287.9 MB / 1513789记录。
This runs very slow and takes around 8 minutes to write 287.9 MB / 1513789 records.
鉴于网络延迟始终是,我如何调整spark和elasticsearch设置使其更快
How can I tune spark and elasticsearch settings to make it faster given that network latency will always be there.
我在本地模式下使用Spark,它具有16个内核和64GB RAM。
我的Elasticsearch群集有一个主节点和3个数据节点,每个节点有16个核心,每个核心64GB。
I am using spark in local mode and have 16 cores and 64GB RAM. My elasticsearch cluster has one master and 3 data nodes with 16 cores and 64GB each.
我正在读取以下文本文件
I am reading text file as below
val readOptions: Map[String, String] = Map("ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"inferSchema" -> "false",
"header" -> "false",
"delimiter" -> "\t",
"comment" -> "#",
"mode" -> "PERMISSIVE")
....
val input = sqlContext.read.options(readOptions).csv(inputFile.getAbsolutePath)
推荐答案
首先,让我们从应用程序中发生的事情开始。 Apache Spark正在读取1个(不是很大)已压缩的 csv
文件。因此,第一个spark将花费时间解压缩数据并对其进行扫描,然后再将其写入 elasticsearch
。
First, Let's start with what's happening in your application. Apache Spark is reading 1 (not so big) csv
file which is compressed. Thus first spark will spend time decompressing data and scan it before writing it in elasticsearch
.
这将创建数据集
/ DataFrame
具有一个分区(由注释中提到的 df.rdd.getNumPartitions
的结果确认)。
This will create a Dataset
/DataFrame
with one partition (confirmed by the result of your df.rdd.getNumPartitions
mentioned in the comments).
一种简单的解决方案是将分区
读入并缓存,然后将其写入 elasticsearch
。现在,我不确定您的数据是什么样的,因此确定分区的数量是您的基准。
One straight-forward solution would be to repartition
your data on read and cache it, before writing it to elasticsearch
. Now I'm not sure what your data looks like, so deciding the number of partitions is subject of benchmark from your side.
val input = sqlContext.read.options(readOptions)
.csv(inputFile.getAbsolutePath)
.repartition(100) // 100 is just an example
.cache
我是不确定会在您的应用程序中受益多少,因为我认为可能还会遇到其他瓶颈(网络IO,ES的磁盘类型)。
I'm not sure how much will be the benefit on your application, because I believe there might be other bottlenecks (network IO, disk type for ES).
PS: 我应该先将csv转换为Parquet文件,然后在它们之上构建ETL。在这里可以真正获得性能。 (个人意见和基准)
PS: I ought converting csv to parquet files before building ETL over them. There is real gain of performance here. (personal opinion and benchmarks)
另一种可能的优化方法是调整 es.batch.size.entries
设置用于elasticsearch-spark连接器。缺省值为 1000
。
Another possible optimization would be to tweak the es.batch.size.entries
setting for the elasticsearch-spark connector. The default value is 1000
.
设置此参数时要小心,因为可能会使Elasticsearch重载。我强烈建议您在此处。
You need to be careful when setting this parameter because you might overload elasticsearch. I strongly advice you take a look at the available configurations here.
我希望这会有所帮助!
I hope this helps !
这篇关于从spark写入elasticsearch非常慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!