在本地主机上运行的Spark BlockManager [英] Spark BlockManager running on localhost

查看:99
本文介绍了在本地主机上运行的Spark BlockManager的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的脚本文件,正在尝试在火花壳中执行,该文件模仿了教程此处

 导入org.apache.spark.SparkConf导入org.apache.spark.SparkContextsc.stop();val conf = new SparkConf().setAppName("MyApp").setMaster("mesos://zk://172.24.51.171:2181/mesos").set("spark.executor.uri","hdfs://172.24.51.171:8020/spark-1.3.0-bin-hadoop2.4.tgz").set("spark.driver.host," 172.24.51.142)val sc2 =新的SparkContext(conf)val文件= sc2.textFile("hdfs://172.24.51.171:8020/input/pg4300.txt")val错误= file.filter(line => line.contains("ERROR"))errors.count() 

我的namenode和mesos master在172.24.51.171上,我的IP地址是172.24.51.142.我将这些行保存到文件中,然后使用以下命令启动该文件:

 /opt/spark-1.3.0-bin-hadoop2.4/bin/spark-shell -i WordCount.scala 

我的远程执行程序都死于与以下类似的错误:

  08/04/15 14:30:39错误RetryingBlockFetcher:开始获取1个未完成块时发生异常java.io.IOException:无法连接到localhost/127.0.0.1:48554在org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)在org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)在org.apache.spark.network.netty.NettyBlockTransferService $$ anon $ 1.createAndStart(NettyBlockTransferService.scala:78)在org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)在org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)在org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)在org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)在org.apache.spark.storage.BlockManager $$ anonfun $ doGetRemote $ 2.apply(BlockManager.scala:594)在org.apache.spark.storage.BlockManager $$ anonfun $ doGetRemote $ 2.apply(BlockManager.scala:592)在scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)在scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)在org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:592)在org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:586)在org.apache.spark.broadcast.TorrentBroadcast $$ anonfun $ org $ apache $ spark $ broadcast $ TorrentBroadcast $$ readBlocks $ 1.org $ apache $ spark $ broadcast $ TorrentBroadcast $$ anonfun $$ getRemote $ 1(TorrentBroadcast.scala:scala:)at org.apache.spark.broadcast.TorrentBroadcast $$ anonfun $ org $ apache $ spark $ broadcast $ TorrentBroadcast $$ readBlocks $ 1 $$ anonfun $ 1.apply(TorrentBroadcast.scala:136)at org.apache.spark.broadcast.TorrentBroadcast $$ anonfun $ org $ apache $ spark $ broadcast $ TorrentBroadcast $$ readBlocks $ 1 $$ anonfun $ 1.apply(TorrentBroadcast.scala:136)在scala.Option.orElse(Option.scala:257)at org.apache.spark.broadcast.TorrentBroadcast $$ anonfun $ org $ apache $ spark $ broadcast $ TorrentBroadcast $ readreadBlocks $ 1.apply $ mcVI $ sp(TorrentBroadcast.scala:136)位于org.apache.spark.broadcast.TorrentBroadcast $$ anonfun $ org $ apache $ spark $ broadcast $ TorrentBroadcast $$ readBlocks $ 1.apply(TorrentBroadcast.scala:119)位于org.apache.spark.broadcast.TorrentBroadcast $$ anonfun $ org $ apache $ spark $ broadcast $ TorrentBroadcast $$ readBlocks $ 1.apply(TorrentBroadcast.scala:119)在scala.collection.immutable.List.foreach(List.scala:318)在org.apache.spark.broadcast.TorrentBroadcast.org上$ apache $ spark $ broadcast $ TorrentBroadcast $$ readBlocks(TorrentBroadcast.scala:119)位于org.apache.spark.broadcast.TorrentBroadcast $$ anonfun $ readBroadcastBlock $ 1.apply(TorrentBroadcast.scala:174)在org.apache.spark.util.Utils $ .tryOrIOException(Utils.scala:1152)在org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)在org.apache.spark.broadcast.TorrentBroadcast._value $ lzycompute(TorrentBroadcast.scala:64)在org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)在org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)在org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)在org.apache.spark.scheduler.Task.run(Task.scala:64)在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:203)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)在java.lang.Thread.run(Thread.java:745)引起原因:java.net.ConnectException:连接被拒绝:localhost/127.0.0.1:48554在sun.nio.ch.SocketChannelImpl.checkConnect(本机方法)在sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)在io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)在io.netty.channel.nio.AbstractNioChannel $ AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)在io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)在io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)在io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)在io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)在io.netty.util.concurrent.SingleThreadEventExecutor $ 2.run(SingleThreadEventExecutor.java:116)...还有1个 

此错误发生在我运行errors.count()命令之后.在我的shell的前面,在创建新的SparkContext之后,我看到了以下行:

  08/04/15 14:31:18 INFO NettyBlockTransferService:创建于48554的服务器15/04/08 14:31:18 INFO BlockManagerMaster:尝试注册BlockManager15/04/08 14:31:18 INFO BlockManagerMasterActor:使用265.4 MB RAM,BlockManagerId(< driver> ;、本地主机,48554)注册块管理器localhost:4855415/04/08 14:31:18 INFO BlockManagerMaster:已注册的BlockManager 

我猜发生了什么事情,Spark正在将BlockManager的地址记录为localhost:48554,然后将其发送到所有尝试与localhost:48554对话的执行程序,而不是驱动程序的IP地址在端口48554上.为什么spark使用本地主机作为BlockManager的地址,而不使用spark.driver.host?

其他信息

  1. Spark配置中,有火花.blockManager.port但没有spark.blockManager.host?只有一个spark.driver.host,您可以看到我在我的SparkConf中设置了它.

  2. 可能与此 JIRA票证有关.网络问题.我的网络已经配置好DNS.

解决方案

在调用spark-shell(或添加spark-defaults.conf)时,可以使用--master参数提供Spark Master地址吗?我遇到了类似的问题(请参阅我的帖子 Spark Shell侦听在本地主机上而不是配置的IP地址上),并且当在外壳中动态创建上下文时,BlockManager似乎在本地主机上侦听.

日志:

  • 使用原始上下文时(侦听主机名)BlockManagerInfo:在ubuntu64server2:33301上的内存中添加了broadcast_1_piece0

  • 创建新上下文时(在localhost上监听)BlockManagerInfo:在localhost:40235上的内存中添加了broadcast_1_piece0

我必须连接到Cassandra集群,并能够通过在spark-defaults.conf中提供spark.cassandra.connection.host并在spark shell中导入包com.datastax.spark.connector._来进行查询./p>

I have a simple script file I am trying to execute in the spark-shell that mimics the tutorial here

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

sc.stop();

val conf = new SparkConf().setAppName("MyApp").setMaster("mesos://zk://172.24.51.171:2181/mesos").set("spark.executor.uri", "hdfs://172.24.51.171:8020/spark-1.3.0-bin-hadoop2.4.tgz").set("spark.driver.host", "172.24.51.142")

val sc2 = new SparkContext(conf)

val file = sc2.textFile("hdfs://172.24.51.171:8020/input/pg4300.txt")

val errors = file.filter(line => line.contains("ERROR"))

errors.count()

My namenode and mesos master are on 172.24.51.171, my ip address is 172.24.51.142. I have these line saved to a file, which I then launch using the command:

/opt/spark-1.3.0-bin-hadoop2.4/bin/spark-shell -i WordCount.scala

My remote executors are all dying with errors similar to the following:

15/04/08 14:30:39 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks 
java.io.IOException: Failed to connect to localhost/127.0.0.1:48554
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
    at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:594)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:592)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:592)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:586)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.org$apache$spark$broadcast$TorrentBroadcast$$anonfun$$getRemote$1(TorrentBroadcast.scala:126)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: localhost/127.0.0.1:48554
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    ... 1 more 

This failure happens after I run the errors.count() command. Earlier in my shell, after I create the new SparkContext I see the lines:

15/04/08 14:31:18 INFO NettyBlockTransferService: Server created on 48554
15/04/08 14:31:18 INFO BlockManagerMaster: Trying to register BlockManager
15/04/08 14:31:18 INFO BlockManagerMasterActor: Registering block manager localhost:48554 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 48554)
15/04/08 14:31:18 INFO BlockManagerMaster: Registered BlockManager

I guess whats happening is Spark is recording the address of the BlockManager as localhost:48554, which is then getting sent to all the executors who try to talk to their localhosts:48554, instead of the driver's ip address at port 48554. Why is spark using localhost as the address of the BlockManager and not spark.driver.host?

Additional Information

  1. In the Spark Config there is a spark.blockManager.port but no spark.blockManager.host? There is only a spark.driver.host, which you can see I set in my SparkConf.

  2. Possibly related to this JIRA Ticket although that seemed like a network issue. My network is configured with DNS just fine.

解决方案

Can you try by providing Spark Master address using --master parameter when invoking the spark-shell (or add in spark-defaults.conf). I had a similar issue (see my post Spark Shell Listens on localhost instead of configured IP address) and it looks like BlockManager listens on localhost when the context is dynamically created in the shell.

Logs:

  • When original context is used (listens on hostname) BlockManagerInfo: Added broadcast_1_piece0 in memory on ubuntu64server2:33301

  • When new context is created (listens on localhost) BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:40235

I had to connect to a Cassandra cluster and was able to query it by providing spark.cassandra.connection.host in spark-defaults.conf and importing packages com.datastax.spark.connector._ in the spark shell.

这篇关于在本地主机上运行的Spark BlockManager的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆