为什么由于连接被拒绝,YARN上的Spark应用程序失败并显示FetchFailedException? [英] Why Spark application on YARN fails with FetchFailedException due to Connection refused?

查看:300
本文介绍了为什么由于连接被拒绝,YARN上的Spark应用程序失败并显示FetchFailedException?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用spark version 1.6.3,而yarn version 2.7.1.2.3HDP-2.3.0.0-2557一起提供.因为,在我使用的HDP版本中,spark版本太旧了,我宁愿远程使用另一个spark作为纱线模式.

I am using spark version 1.6.3 and yarn version 2.7.1.2.3 comes with HDP-2.3.0.0-2557. Becuase, spark version is too old in the HDP version that I use, I prefer to use another spark as yarn mode remotely.

这是我运行spark shell的方式;

Here is how I run spark shell;

./spark-shell --master yarn-client

一切似乎都很好,sparkContext已初始化,sqlContext已初始化.我什至可以访问我的蜂巢表.但是在某些情况下,尝试连接到区块管理器时会遇到麻烦.

Everything seem fine, sparkContext is initialized, sqlContext is initialized. I can even access my hive tables. But in some cases, it is getting in trouble when it tries to connect to block managers.

我不是专家,但是我认为,当我在纱线模式下运行时,区块管理器正在我的纱线簇上运行.对我来说,这似乎是第一次网络问题,不想在这里提出.但是,这在某些情况下会发生,但我还无法弄清楚.因此,我认为这可能不是网络问题.

I am not an expert but I think, that block managers while I run it on yarn mode, are running on my yarn cluster. It seemed a network problem to me for the first time and didn't want to ask it in here. But, this happens in some cases which I couldn't figure out yet. So it makes me think this may not be network problem.

这是代码;

def df = sqlContext.sql("select * from city_table")

下面的代码可以正常工作;

The codes below works fine;

df.limit(10).count()

但是大小超过10,我不知道,每次运行都会改变;

But the size is more than 10, I don't know, this changes on every run;

df.count()

这引发了一个异常;

6/12/30 07:31:04 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 157 bytes
16/12/30 07:31:19 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 8, 172.27.247.204): FetchFailed(BlockManagerId(2, 172.27.247.204, 56093), shuffleId=2, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to /172.27.247.204:56093
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to /172.27.247.204:56093
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ... 3 more
Caused by: java.net.ConnectException: Connection refused: /172.27.247.204:56093
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    ... 1 more

)

我只是意识到,当有多个任务需要改组时,就会发生这种情况.

I could just realised that this happens when there are more than one task to shuffle.

问题是什么,是我看不到的性能问题还是另一个网络问题.洗牌是什么?如果是网络问题,是我的火花和纱线之间,还是纱线本身存在问题?

What is the problem, is it a performance issue or another network issue that I couldn't see. What is that shuffling? If it is network issue, is it between my spark and yarn or, a problem in yarn itself?

谢谢.

我只是在日志中看到了一些东西

I just see something in the logs;

17/01/02 06:45:17 INFO DAGScheduler: Executor lost: 2 (epoch 13)
17/01/02 06:45:17 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
17/01/02 06:45:17 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, 172.27.247.204, 51809)
17/01/02 06:45:17 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor
17/01/02 06:45:17 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool
17/01/02 06:45:24 INFO BlockManagerMasterEndpoint: Registering block manager 172.27.247.204:51809 with 511.1 MB RAM, BlockManagerId(2, 172.27.247.204, 51809)

有时候,在另一个块管理器上重试它是可行的,但是,由于超过了默认的最大允许次数4,所以它在大多数情况下都不会结束.

Sometimes, retrying it on another block manager works, But, because the maximum allowable number of times which is 4 as default is exceeded, it never ends most of the time.

纱线真的对此保持沉默,但是我认为这是网络问题,我可以将问题迭代到某个地方;

Yarn is really really silent about that, but I think this is Network issue, I could iterate the problem to somewhere;

此火花已部署在HDP环境之外.当spark向纱线提交申请时,yarn会通知Spark驱动程序有关块管理器和执行器的信息.执行器是HDP群集中的数据节点,并且在其专用网络中具有不同的IP.但是,当涉及在集群外部通知Spark驱动程序时,它将为所有执行者提供相同且始终为单个IP.这是因为HDP群集中的所有节点都通过路由器并具有相同的IP出去.假设IP为150.150.150.150,当spark驱动程序需要连接并向执行程序询问问题时,它将使用此IP尝试它.但是,该IP实际上是整个群集的外部IP地址,而不是单个数据节点IP.

This spark is deployed outside of the HDP environment. When spark submit an application to yarn, yarn informs the spark driver about the block manager and executors. Executors are data nodes in HDP cluster and have different IP in its private network. But, when it comes to informing spark driver at outside of the cluster, it gives same and always single IP for all executors. This is because all nodes in HDP cluster getting out over a router and with same IP. Assume that IP is 150.150.150.150, when spark driver needs to connect and ask something from that executors, it tries it with this IP. But this IP is actually outer IP address of whole cluster, not an individual data node IP.

是否可以通过其私有ip使yarn告知执行者(块管理器).因为,也可以从该Spark驱动程序正在使用的机器上访问其私有IP.

Is there way to make yarn informs about the executors(Block Managers) with its private ip. Because, their private IP's are also accessible from the machine that this spark driver is working on.

推荐答案

FetchFailedException异常.通常,这意味着执行程序(对于shuffle块为BlockManager)已死,因此是一个例外:

FetchFailedException exception is thrown when a reducer task (for a ShuffleDependency) could not fetch shuffle blocks. It usually means that the executor (with the BlockManager for the shuffle blocks) died and hence the exception:

Caused by: java.io.IOException: Failed to connect to /172.27.247.204:56093

由于过多的内存使用,执行程序可能被OOMed(抛出OutOfMemoryError)或YARN决定将其杀死.

The executor could OOMed (= OutOfMemoryError thrown) or YARN decided to kill it due to excessive memory usage.

您应该使用纱线记录命令并找出问题的根本原因.

You should review the logs of the Spark application using yarn logs command and find out the root cause of the issue.

yarn logs -applicationId <application ID> [options]

您还可以在Web UI的执行者"选项卡中查看Spark应用程序的执行者的状态.

You could also review the status of your Spark application's executors in the Executors tab in web UI.

火花通常通过重新运行受影响的任务从FetchFailedException中恢复.使用Web UI来查看Spark应用程序的性能. FetchFailedException可能是由于临时内存打cup"引起的.

Spark usually recovers from a FetchFailedException by re-running the affected tasks. Use web UI to see how your Spark application performs. FetchFailedException could be due to a temporary memory "hiccup".

这篇关于为什么由于连接被拒绝,YARN上的Spark应用程序失败并显示FetchFailedException?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆