从spark写入elasticsearch非常慢 [英] Write to elasticsearch from spark is very slow

查看:923
本文介绍了从spark写入elasticsearch非常慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理一个文本文件,并从Spark应用程序将转换后的行写为波纹管

I am processing a text file and writing transformed rows from a Spark application to elastic search as bellow

input.write.format("org.elasticsearch.spark.sql")
      .mode(SaveMode.Append)
      .option("es.resource", "{date}/" + dir).save()

运行速度非常慢,大约需要8分钟才能写入287.9 MB / 1513789记录。

This runs very slow and takes around 8 minutes to write 287.9 MB / 1513789 records.

鉴于网络延迟始终是,我如何调整spark和elasticsearch设置使其更快

How can I tune spark and elasticsearch settings to make it faster given that network latency will always be there.

我在本地模式下使用Spark,它具有16个内核和64GB RAM。
我的Elasticsearch群集有一个主节点和3个数据节点,每个节点有16个核心,每个核心64GB。

I am using spark in local mode and have 16 cores and 64GB RAM. My elasticsearch cluster has one master and 3 data nodes with 16 cores and 64GB each.

我正在读取以下文本文件

I am reading text file as below

 val readOptions: Map[String, String] = Map("ignoreLeadingWhiteSpace" -> "true",
  "ignoreTrailingWhiteSpace" -> "true",
  "inferSchema" -> "false",
  "header" -> "false",
  "delimiter" -> "\t",
  "comment" -> "#",
  "mode" -> "PERMISSIVE")

....

val input = sqlContext.read.options(readOptions).csv(inputFile.getAbsolutePath)


推荐答案

首先,让我们从应用程序中发生的事情开始。 Apache Spark正在读取1个(不是很大)已压缩的 csv 文件。因此,第一个spark将花费时间解压缩数据并对其进行扫描,然后再将其写入 elasticsearch

First, Let's start with what's happening in your application. Apache Spark is reading 1 (not so big) csv file which is compressed. Thus first spark will spend time decompressing data and scan it before writing it in elasticsearch.

这将创建数据集 / DataFrame 具有一个分区(由注释中提到的 df.rdd.getNumPartitions 的结果确认)。

This will create a Dataset/DataFrame with one partition (confirmed by the result of your df.rdd.getNumPartitions mentioned in the comments).

一种简单的解决方案是将分区读入并缓存,然后将其写入 elasticsearch 。现在,我不确定您的数据是什么样的,因此确定分区的数量是您的基准。

One straight-forward solution would be to repartition your data on read and cache it, before writing it to elasticsearch. Now I'm not sure what your data looks like, so deciding the number of partitions is subject of benchmark from your side.

val input = sqlContext.read.options(readOptions)
                      .csv(inputFile.getAbsolutePath)
                      .repartition(100) // 100 is just an example
                      .cache

我是不确定会在您的应用程序中受益多少,因为我认为可能还会遇到其他瓶颈(网络IO,ES的磁盘类型)。

I'm not sure how much will be the benefit on your application, because I believe there might be other bottlenecks (network IO, disk type for ES).

PS: 我应该先将csv转换为Parquet文件,然后在它们之上构建ETL。在这里可以真正获得性能。 (个人意见和基准)

PS: I ought converting csv to parquet files before building ETL over them. There is real gain of performance here. (personal opinion and benchmarks)

另一种可能的优化方法是调整 es.batch.size.entries 设置用于elasticsearch-spark连接器。缺省值为 1000

Another possible optimization would be to tweak the es.batch.size.entries setting for the elasticsearch-spark connector. The default value is 1000.

设置此参数时要小心,因为可能会使Elasticsearch重载。我强烈建议您在此处

You need to be careful when setting this parameter because you might overload elasticsearch. I strongly advice you take a look at the available configurations here.

我希望这会有所帮助!

I hope this helps !

这篇关于从spark写入elasticsearch非常慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆