如何将位于HDFS上的类型安全配置文件添加到spark-submit(集群模式)? [英] How to add a typesafe config file which is located on HDFS to spark-submit (cluster-mode)?

查看:686
本文介绍了如何将位于HDFS上的类型安全配置文件添加到spark-submit(集群模式)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark(Spark 1.5.2)应用程序,可以将来自Kafka的数据传输到HDFS。我的应用程序包含两个Typesafe配置文件,用于配置某些内容,如Kafka主题等。



现在我想用集群中的spark-submit(集群模式)运行我的应用程序。
具有我项目所有依赖项的jar文件存储在HDFS上。
只要我的配置文件包含在jar文件中,一切正常。但是这对于测试目的来说是不实用的,因为我总是需要重建jar。

因此,我排除了我的项目的配置文件,并且通过driver-class-路径。这工作在客户端模式,但如果我现在将配置文件移动到HDFS并以集群模式运行我的应用程序,它无法找到设置。下面你可以找到我的spark-submit命令:



$ p $ / usr / local / spark / bin / spark-submit \
--total-executor-cores 10 \
--executor-memory 15g \
--verbose \
--deploy-mode cluster \
- -class com.hdp.speedlayer.SpeedLayerApp \
--driver-class-path hdfs:// iot-master:8020 / user / spark / config \
--master spark:// spark-master:6066 \
hdfs:// iot-master:8020 / user / spark / speed-layer-CONFIG.jar

我已经用--file参数试过了,但是也没有用。有人知道我能如何解决这个问题吗?



更新:

我做了一些进一步的调查,并发现它可能与HDFS路径有关。我将HDFS路径更改为hdfs:/// iot-master:8020 // user // spark // config但是不幸的是,这也不起作用,但也许这可以帮到你。



下面您还可以看到我在集群模式下运行驱动程序时遇到的错误:

 线程中的异常mainjava.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)$ b $ at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark。 (org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
)引起的:java.lang.ExceptionInInitializerError
at com.speedlayer.SpeedLayerApp.main(SpeedLayerApp.scala)
... 6 more
引起:com.typesafe.config.ConfigException $ Missing:找不到密钥'application'的配置设置
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com。 typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config。 impl.SimpleConfig.find(SimpleConfig.java:164)
...


解决为了达到同样的结果,我发现了以下内容:


  1. - files:仅与运行spark-submit命令的机器上的本地文件,并转换为 conf.addFile()。因此,除非您能够在检索文件之前运行 hdfs dfs -get< ....> ,否则hdfs文件将无法正常工作。在我的情况下,我想从oozie运行它,所以我不知道要在哪台机器上运行,我不想将复制文件操作添加到我的工作流程中。 引用@Yuval_Itzchakov take指的是 - 只能处理罐子的罐子,因为它会转换为 conf.addJar()

所以据我所知,从hdfs加载配置文件没有任何方法。



我的方法是将路径传递到我的应用程序并阅读配置文件并将其合并到参考文件中:

  private val HDFS_IMPL_KEY =fs.hdfs.impl
def loadConf(pathToConf:String):Config = {
val path = new Path(pathToConf)
val confFile = File.createTempFile(path.getName,tmp)
confFile.deleteOnExit()
getFileSystemByUri(path.toUri).copyToLocalFile(path,new Path(confFile.getAbsolutePath))

ConfigFactory.load(ConfigFactory.parseFile(confFile))
}

def getFileSystemByUri(uri:URI):FileSy (){
val hdfsConf = new Configuration()
hdfsConf.set(HDFS_IMPL_KEY,classOf [org.apache.hadoop.hdfs.DistributedFileSystem] .getName)
FileSystem.get(uri,hdfsConf )
}

PS错误只意味着ConfigFactory没有找到任何配置文件,所以他找不到你要找的财产。

I have a Spark (Spark 1.5.2) application that streams data from Kafka to HDFS. My application contains two Typesafe config files to configure certain things like Kafka topic etc.

Now I want to run my application with spark-submit (cluster mode) in a cluster. The jar file with all dependencies of my project is stored on HDFS. As long as my config files are included in the jar file everything works fine. But this is unpractical for testing purposes because I always have to rebuild the jar.

Therefore I excluded the config files of my project and I added them via "driver-class-path". This worked on client mode but if I move the config files now to HDFS and run my application in cluster mode it can't find the settings. Below you can find my spark-submit command:

/usr/local/spark/bin/spark-submit \
    --total-executor-cores 10 \
    --executor-memory 15g \
    --verbose \
    --deploy-mode cluster\
    --class com.hdp.speedlayer.SpeedLayerApp \
    --driver-class-path hdfs://iot-master:8020/user/spark/config \
    --master spark://spark-master:6066 \
    hdfs://iot-master:8020/user/spark/speed-layer-CONFIG.jar

I already tried it with the --file parameter but that also didn't work. Does anybody know how I can fix this?

Update:

I did some further research and I figured out that it could be related to the HDFS path. I changed the HDFS path to "hdfs:///iot-master:8020//user//spark//config But unfortunately that also that didn't work. But maybe this could help you.

Below you can also see the error I get when I run the driver program in cluster mode:

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ExceptionInInitializerError
    at com.speedlayer.SpeedLayerApp.main(SpeedLayerApp.scala)
    ... 6 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'application'
    at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
...

解决方案

Trying to achieve the same result I found out the following:

  1. --files: is associated only to local files on machine running the spark-submit command and converts to conf.addFile(). so hdfs files wont work unless you are able to run hdfs dfs -get <....> before to retrieve the file. in my case I want to run it from oozie so I dont know on which machine its going to run and I dont want to add a copy file action to my workflow.
  2. The quote @Yuval_Itzchakov took refers to --jars which only handles jars since it converts to conf.addJar()

So as far as I know there is no strait way to load configuration file from hdfs.

My approach was to pass the path to my app and read the configuration file and merge it into reference file:

private val HDFS_IMPL_KEY = "fs.hdfs.impl"
def loadConf(pathToConf: String): Config = {
   val path = new Path(pathToConf)
   val confFile = File.createTempFile(path.getName, "tmp")
   confFile.deleteOnExit()
   getFileSystemByUri(path.toUri).copyToLocalFile(path, new Path(confFile.getAbsolutePath))

   ConfigFactory.load(ConfigFactory.parseFile(confFile))
}

def getFileSystemByUri(uri: URI) : FileSystem  = {
   val hdfsConf = new Configuration()
   hdfsConf.set(HDFS_IMPL_KEY, classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
FileSystem.get(uri, hdfsConf)
}

P.S the error only means that the ConfigFactory didnt find any configuration file, so he couldn't find the property you are looking for.

这篇关于如何将位于HDFS上的类型安全配置文件添加到spark-submit(集群模式)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆