处理 Spark Streaming rdd 并存储到单个 HDFS 文件 [英] Process Spark Streaming rdd and store to single HDFS file

查看:25
本文介绍了处理 Spark Streaming rdd 并存储到单个 HDFS 文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

  1. 我正在使用 Kafka Spark Streaming 来获取流数据.

  1. I am using Kafka Spark Streaming to get streaming data.

val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)

  • 我正在使用这个 DStream 并处理 RDD

  • I am using this DStream and processing RDDs

    val output = lines.foreachRDD(rdd => 
            rdd.foreachPartition { partition => 
                partition.foreach { file => runConfigParser(file)}
    })
    

  • runConfigParser 是一种 JAVA 方法,它解析文件并生成我必须保存在 HDFS 中的输出.因此,多个节点将处理 RDD 并将输出写入一个单一的 HDFS 文件.因为我想在 HIVE 中加载这个文件.

  • runConfigParser is a JAVA method which parses a file and produces an output which i have to save in HDFS. So multiple nodes will process RDD and write output into one single HDFS file. As i want to load this fie in HIVE.

    我应该输出 runConfigParser 的结果并使用 sc.parallze(output).saveAsTextFile(path) 这样我的所有节点都会将 RDD 输出写入单个 HDFS 文件.?这种设计有效吗?

    should I output the result of runConfigParser and use sc.parallze(output).saveAsTextFile(path) such that all my nodes will write RDD outputs to single HDFS file.? Is this design efficient ?

    我将在 HIVE 中加载这个单一的 HDFS 文件(它将作为其流数据不断更新)并使用 Impala 进行查询.

    I will load this single HDFS file (which will be constantly updated as its streaming data) in HIVE and query using Impala.

    推荐答案

    您可以使用一个函数来合并"saveAsTextFile 的结果.像这样:

    You can use a function to "merge" the result of saveAsTextFile. Like this:

    import org.apache.hadoop.fs._
    
    def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
        val sourceFile = hdfsServer + "/tmp/" 
        rdd.saveAsTextFile(sourceFile)
        val dstPath = hdfsServer + "/final/" 
        merge(sourceFile, dstPath, fileName)
      }
    
      def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
        val hadoopConfig = new Configuration()
        val hdfs = FileSystem.get(hadoopConfig)
        val destinationPath = new Path(dstPath)
        if (!hdfs.exists(destinationPath)) {
          hdfs.mkdirs(destinationPath)
        }
        FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
      }
    

    这篇关于处理 Spark Streaming rdd 并存储到单个 HDFS 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆