Spark 2.2.0 S3性能 [英] Spark 2.2.0 S3 Performance

查看:100
本文介绍了Spark 2.2.0 S3性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将代码从测试实验室集群迁移到EC2集群。我使用 flintrock 进行设置,然后运行香草 Spark 2.2.0
目前,群集具有4个 c3.2xlarge 节点(1个主节点,3个Worker)

I'm in the process of moving my code from the test lab cluster to a EC2 cluster. I set it up using flintrock and I'm running "vanilla" Spark 2.2.0 . At the moment the cluster has 4 c3.2xlarge nodes (1 master, 3 Worker)

我想处理一大堆文件,每个文件本身都比较大(大约1 GB)。在我的代码中,我将文件数量分割为多个块。在实验室中,我发现8vCPU 13G System的性能达到每块约32个文件的峰值,并将结果保存到实木复合地板中。在具有3位工作人员的EC2上,我将其翻译为96个块,这导致192个任务。现在,我遇到了S3性能不佳的问题。我收到以下错误:

I want to process a large set of files with each file in itself beeing relatively large (around 1 GB). In my code I slice the number of files up into chunks. In the "lab" I found that the performance for a 8vCPU 13G System peaks at about 32 files per chunk and save the result to a parquet. On EC2 with 3 workers I translated this to 96 chunks which results in 192 task. Now I'm confronted with bad S3 performance. I get the following error:

17/09/09 03:45:33 INFO AmazonHttpClient: Unable to execute HTTP request: Read timed out
java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
    at sun.security.ssl.InputRecord.read(InputRecord.java:503)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
    at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160)
    at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84)
    at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
    at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
    at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
    at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:259)
    at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:209)
    at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
    at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66)
    at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
    at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:686)
    at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:488)
    at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:384)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.copyObject(AmazonS3Client.java:1507)
    at com.amazonaws.services.s3.transfer.internal.CopyCallable.copyInOneChunk(CopyCallable.java:143)
    at com.amazonaws.services.s3.transfer.internal.CopyCallable.call(CopyCallable.java:131)
    at com.amazonaws.services.s3.transfer.internal.CopyMonitor.copy(CopyMonitor.java:189)
    at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:134)
    at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:46)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我遵循了本指南: https://hortonworks.github.io/ hdp-aws / s3-perfo rmance /

我将sparkconf设置更改为:

I change the sparkconf setup to this:

conf = SparkConf().setAppName(appname)\
.setMaster(master)\
.set('spark.executor.memory','13g')\
.set('spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version','2')\
.set('fs.s3a.fast.upload','true')\
.set('fs.s3a.fast.upload.buffer','disk')\
.set('fs.s3a.buffer.dir','/tmp/s3a')

并使用s3a保存镶木地板,如下所示:

and save the parquet with s3a like this:

df.write.parquet('s3a://mybucket/result_parquet')

我也将块大小减小到48(每个实例16)。错误变得越来越少,但仍然有一些出现。但是现在由于块大小减小而导致性能下降。

Also I reduced the chunk size to 48 (16 per instance). The errors become less but still some come up. But now performance is decreased because of the reduced chunk size.

现在我想知道:

a)我是否正确配置了 SparkConf()?错误仅在我减小了块大小后才显着减少。

a) Did I configure SparkConf() correctly? The errors where only significantly reduced after I reduced the chunk size.

b)如果S3性能受每个EC2实例的请求限制。因此,如果我不是3个中型实例,而是6个较小的实例,那么S3就能更好地处理192个镶木地板写任务,仅仅是因为它们来自更多实例?

b) If the S3 performance is limited by "requests" per EC2 Instance. So if I instead of 3 medium instances I get 6 smaller would S3 then be able to handle 192 parquet write tasks better just because they come from more Instances?

推荐答案


  1. 您到S3的带宽取决于您租用的VM类型:网络越好,带宽越大。


  2. 但是写到特定的存储桶或存储桶的碎片,S3将在其中发送503响应以供客户端处理。

  3. 但是您在这里看到的是作为Socket Exception出现的;

  1. Your bandwidth to S3 depends on the VM type you rent: the better the network, the more bandwidth.
  2. You are throttled for writing to a specific bucket or shard of a bucket, where S3 will send a 503 response back for a client to handle.
  3. But what you are seeing here is surfacing as a Socket Exception; looks like the AWS xfer manager isn't handling it.

如果这是在任务提交时发生的,那是因为s3a模仿了named()使用copy + delete,输出提交者希望重命名是O(1)原子操作,而不是缓慢的操作。更重要的是,重命名的模拟取决于S3中列出的文件,并且由于s3最终是一致的,因此可能会丢失列表中的文件。 您将丢失数据

If this is happening at task commit, it's because s3a mimics rename() with copy + delete, and the output committer expects rename to be an O(1) atomic operation, rather than a slow one. More critically, that mimic of rename depends on listing files in S3, and, because s3 is eventually consistent, can miss files in the listing. you will lose data

除非您在S3(s3mper,s3guard)上有一个一致性层,否则您应该以任何顺序提交到本地HDFS查询,在完成最后工作时最多复制到S3。或者,如果您有时间,请帮助测试HADOOP-13786,它会添加0-rename-commit

Unless you have a consistency layer atop S3 (s3mper, s3guard), you should be committing to local HDFS for any sequence of queries, copying up to S3 when you have the final work. Or, if you have time on your hands, help test HADOOP-13786, which adds the 0-rename-commit

这篇关于Spark 2.2.0 S3性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆