流程星火RDD流和存储单一HDFS文件 [英] Process Spark Streaming rdd and store to single HDFS file

查看:207
本文介绍了流程星火RDD流和存储单一HDFS文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述


  1. 我使用的卡夫卡星火流获取流数据。

  1. I am using Kafka Spark Streaming to get streaming data.

val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)


  • 我用这DSTREAM和处理RDDS

  • I am using this DStream and processing RDDs

    val output = lines.foreachRDD(rdd => 
            rdd.foreachPartition { partition => 
                partition.foreach { file => runConfigParser(file)}
    })
    


  • runConfigParser 是分析文件和产生一个输出,我在HDFS保存Java方法。因此,多个节点将处理RDD和输出写入到一个单一的HDFS文件。因为我想加载这个FIE在HIVE。

  • runConfigParser is a JAVA method which parses a file and produces an output which i have to save in HDFS. So multiple nodes will process RDD and write output into one single HDFS file. As i want to load this fie in HIVE.

    我应该输出 runConfigParser 的结果,并使用 sc.parallze(输出).saveAsTextFile(路径)这样的我所有的节点会写RDD输出到单个文件的HDFS?这是设计的效率?

    should I output the result of runConfigParser and use sc.parallze(output).saveAsTextFile(path) such that all my nodes will write RDD outputs to single HDFS file.? Is this design efficient ?

    我将使用帕拉加载HIVE此单HDFS文件(其将不断更新作为其流数据)和查询。

    I will load this single HDFS file (which will be constantly updated as its streaming data) in HIVE and query using Impala.

    推荐答案

    您可以使用一个函数来合并 saveAsTextFile 的结果。像这样的:

    You can use a function to "merge" the result of saveAsTextFile. Like this:

    import org.apache.hadoop.fs._
    
    def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
        val sourceFile = hdfsServer + "/tmp/" 
        rdd.saveAsTextFile(sourceFile)
        val dstPath = hdfsServer + "/final/" 
        merge(sourceFile, dstPath, fileName)
      }
    
      def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
        val hadoopConfig = new Configuration()
        val hdfs = FileSystem.get(hadoopConfig)
        val destinationPath = new Path(dstPath)
        if (!hdfs.exists(destinationPath)) {
          hdfs.mkdirs(destinationPath)
        }
        FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
      }
    

    这篇关于流程星火RDD流和存储单一HDFS文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆