阿帕奇星火:执行者之间的网络错误 [英] Apache Spark: network errors between executors

查看:1887
本文介绍了阿帕奇星火:执行者之间的网络错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在斯卡拉2.11.2运行Apache 1.3.1星火,并有足够大的数据HPC群集上运行时,我得到许多错误,如在我的文章底部的那些(每秒重复多次,直到工作得到杀害是一段时间内)。基于该错误,执行者试图从其他节点的数据洗牌,但未能如愿。

此相同的程序执行精细与(a)一种较少量的数据,或(b)在仅本地模式,所以它是与在网络中的数据得到发送(并且不与触发一个非常小的数据量)。

这正在围绕这种情况发生时所执行的code是如下:

VAL partitioned_data =数据//数据读取为sc.textFile(INPUTFILE)
  .zipWithIndex.map(X =>(x._2,x._1))
  .partitionBy(分区)//自定义分区
  .MAP(_._ 2)被评估//强制previous懒操作。 presumably增加了一些
//开销,但希望最小的可能...
//建议星火用户列表: http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-RDD-computation-with-something-else-than-count-td707.html
sc.runJob(partitioned_data,(ITER:迭代器[_])=> {})

这是指示错误的,或者是有什么我做错了?

这里的执行者之一(全日志这里)的stderr日志中的一个小片段:

  15/04/21十四时59分28秒错误TransportRequestHandler:发送错误结果ChunkFetchSuccess {streamChunkId = {StreamChunkId =流ID 1601401593000,chunkIndex = 0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data,偏移量= 26501223,长度= 6227612}}到/10.0.0.5:41160;关闭连接
java.io.IOException异常:资源暂时不可用
    在sun.nio.ch.FileChannelImpl.transferTo0(本机方法)
    在sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415)
    在sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516)
    在org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96)
    在org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89)
    在io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237)
    在io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233)
    在io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264)
    在io.netty.channel.AbstractChannel $ AbstractUnsafe.flush0(AbstractChannel.java:707)
    在io.netty.channel.nio.AbstractNioChannel $ AbstractNioUnsafe.flush0(AbstractNioChannel.java:315)
    在io.netty.channel.AbstractChannel $ AbstractUnsafe.flush(AbstractChannel.java:676)
    在io.netty.channel.DefaultChannelPipeline $ HeadContext.flush(DefaultChannelPipeline.java:1059)
    在io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    在io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669)
    在io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
    在io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    在io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718)
    在io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706)
    在io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741)
    在io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895)
    在io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
    在org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147)
    在org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119)
    在org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95)
    在org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
    在org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    在io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    在io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    在io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    。在io.netty.handler codec.MessageToMessageDe coder.channelRead(MessageToMessageDe coder.java:103)
    在io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    在io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    。在io.netty.handler codec.ByteToMessageDe coder.channelRead(ByteToMessageDe coder.java:163)
    在io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    在io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    在io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    在io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    在io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    在io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    在io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    在io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    在io.netty.util.concurrent.SingleThreadEventExecutor $ 2.run(SingleThreadEventExecutor.java:116)
    在java.lang.Thread.run(Thread.java:619)
15/04/21 14时59分28秒ERROR TransportRequestHandler:发送错误结果ChunkFetchSuccess {streamChunkId = {StreamChunkId =流ID 1601401593000,chunkIndex = 1}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/27/shuffle_0_5_0.data,偏移量= 3792987,长度= 2862285}}到/10.0.0.5:41160;关闭连接
java.nio.channels.ClosedChannelException
15/04/21 14时59分28秒ERROR TransportRequestHandler:发送错误结果ChunkFetchSuccess {streamChunkId = {StreamChunkId =流ID 1601401593002,chunkIndex = 0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data,偏移= 0,长度= 10993212}}到/10.0.0.6:42426;关闭连接
java.io.IOException异常:资源暂时不可用
    在sun.nio.ch.FileChannelImpl.transferTo0(本机方法)
    在sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415)
    在sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516)
    在org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96)
    在org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89)
    在io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237)
    在io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233)
    在io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264)
    在io.netty.channel.AbstractChannel $ AbstractUnsafe.flush0(AbstractChannel.java:707)
    在io.netty.channel.nio.AbstractNioChannel $ AbstractNioUnsafe.flush0(AbstractNioChannel.java:315)
    在io.netty.channel.AbstractChannel $ AbstractUnsafe.flush(AbstractChannel.java:676)
    在io.netty.channel.DefaultChannelPipeline $ HeadContext.flush(DefaultChannelPipeline.java:1059)
    在io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    在io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669)
    在io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
    在io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    在io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718)
    在io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706)
    在io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741)
    在io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895)
    在io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
    在org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147)
    在org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119)
    在org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95)
    在org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
    在org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    在io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    在io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    在io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    。在io.netty.handler codec.MessageToMessageDe coder.channelRead(MessageToMessageDe coder.java:103)
    在io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    在io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    。在io.netty.handler codec.ByteToMessageDe coder.channelRead(ByteToMessageDe coder.java:163)
    在io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    在io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    在io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    在io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    在io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    在io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    在io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    在io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    在io.netty.util.concurrent.SingleThreadEventExecutor $ 2.run(SingleThreadEventExecutor.java:116)
    在java.lang.Thread.run(Thread.java:619)
15/04/21 14时59分28秒WARN TransportChannelHandler:异常从node5.someuniversity.edu/10.0.0.5:60089连接
java.io.IOException异常:将连接复位
    在sun.nio.ch.FileDispatcher.read0(本机方法)
    在sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
    在sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233)
    在sun.nio.ch.IOUtil.read(IOUtil.java:206)
    在sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
    在io.netty.buffer.PooledHeapByteBuf.setBytes(PooledHeapByteBuf.java:234)
    在io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
    在io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
    在io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    在io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    在io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    在io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    在io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    在io.netty.util.concurrent.SingleThreadEventExecutor $ 2.run(SingleThreadEventExecutor.java:116)
    在java.lang.Thread.run(Thread.java:619)
15/04/21 14时59分28秒错误TransportResponseHandler:还有2请求时,突出从node5.someuniversity.edu/10.0.0.5:60089连接关闭
15/04/21 14时59分28秒INFO RetryingBlockFetcher:取重试(1/3)2块突出后5000毫秒


解决方案

这似乎是相关的的Netty 联网系统(块传输服务)中的错误,加入<一href=\"https://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/%3CCABPQxssL04Q+rBLtP-D8W+z3aTn+g-Um6GMdGDnh-hZcvD-c_g@mail.gmail.com%3E\">Spark 1.2 。添加 .SET(spark.shuffle.blockTransferService,妞妞)来我SparkConf修复了这一错误,所以现在一切都完美。

我发现<一个href=\"https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAFn+d95xVv4vNz0sX+uH=yM2o6ZvApAA9J5WE_r=JC6siXHj-Q@mail.gmail.com%3E\">a张贴在从已运行到类似的错误有人火花用户邮件列表,他们建议尝试 NIO 而不是的Netty

SPARK-5085 是类似的,就是改变从的Netty NIO 固定他们的问题;然而,他们也可以通过更改一些网络设置来解决这个问题。 (我没有尝试这个还没有我自己,因为我不知道我有权利访问权限在群集上这样做。)

I'm running Apache Spark 1.3.1 on Scala 2.11.2, and when running on an HPC cluster with large enough data, I get numerous errors like the ones at the bottom of my post (repeated multiple times per second, until the job gets killed for being over time). Based on the errors, the executor is attempting to get shuffle data from other nodes but is unable to do so.

This same program executes fine with either (a) a smaller amount of data, or (b) in local-only mode, so it has something to do with the data getting sent over the network (and isn't triggered with a very small amount of data).

The code that is being executed around the time this happens is as follows:

val partitioned_data = data  // data was read as sc.textFile(inputFile)
  .zipWithIndex.map(x => (x._2, x._1))
  .partitionBy(partitioner)  // A custom partitioner
  .map(_._2)

// Force previous lazy operations to be evaluated. Presumably adds some
// overhead, but hopefully the minimum possible...
// Suggested on Spark user list: http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-RDD-computation-with-something-else-than-count-td707.html
sc.runJob(partitioned_data, (iter: Iterator[_]) => {})

Is this indicative of a bug, or is there something I'm doing wrong?

Here's a small snippet of the stderr log of one of the executors (full log is here):

15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data, offset=26501223, length=6227612}} to /10.0.0.5:41160; closing connection
java.io.IOException: Resource temporarily unavailable
    at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
    at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415)
    at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516)
    at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96)
    at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89)
    at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237)
    at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233)
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669)
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741)
    at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895)
    at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
    at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147)
    at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:619)
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000, chunkIndex=1}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/27/shuffle_0_5_0.data, offset=3792987, length=2862285}} to /10.0.0.5:41160; closing connection
java.nio.channels.ClosedChannelException
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593002, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data, offset=0, length=10993212}} to /10.0.0.6:42426; closing connection
java.io.IOException: Resource temporarily unavailable
    at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
    at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415)
    at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516)
    at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96)
    at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89)
    at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237)
    at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233)
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669)
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741)
    at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895)
    at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
    at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147)
    at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:619)
15/04/21 14:59:28 WARN TransportChannelHandler: Exception in connection from node5.someuniversity.edu/10.0.0.5:60089
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233)
    at sun.nio.ch.IOUtil.read(IOUtil.java:206)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
    at io.netty.buffer.PooledHeapByteBuf.setBytes(PooledHeapByteBuf.java:234)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:619)
15/04/21 14:59:28 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from node5.someuniversity.edu/10.0.0.5:60089 is closed
15/04/21 14:59:28 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 outstanding blocks after 5000 ms

解决方案

This appears to be a bug related to the Netty networking system (block transfer service), added in Spark 1.2. Adding .set("spark.shuffle.blockTransferService", "nio") to my SparkConf fixed the bug, so now everything works perfectly.

I found a post on the spark-user mailing list from someone that was running into similar errors, and they suggested trying nio instead of Netty.

SPARK-5085 is similar, in that changing from Netty to nio fixed their issue; however, they were also able to fix the issue by changing some networking settings. (I didn't try this yet myself, since I'm not sure I have the right access privileges to do so on the cluster.)

这篇关于阿帕奇星火:执行者之间的网络错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆