Spark 将 kafka 批量流式传输到单个文件中 [英] Spark batch-streaming of kafka into single file

查看:31
本文介绍了Spark 将 kafka 批量流式传输到单个文件中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用批处理流 (maxRatePerPartition 10.000) 从 Kafka 流式传输数据.所以在每个批次中,我处理 10.000 条 kafka 消息.

I am streaming data from Kafka using batch streaming (maxRatePerPartition 10.000). So in each batch I process 10.000 kafka messages.

在这个批处理运行中,我通过从 rdd 创建一个数据帧来处理每条消息.处理后,我使用:dataFrame.write.mode(SaveMode.append) 将每个处理过的记录保存到同一个文件中.因此它将所有消息附加到同一个文件中.

Within this batch run I process each message by creating a dataFrame out of the rdd. After processing, I save each processed record to the same file using: dataFrame.write.mode(SaveMode.append). So it appends all messages to the same file.

只要它在一个批处理中运行就可以.但是在执行下一次批处理后(处理接下来的 10.000 条消息),它会为接下来的 10.000 条消息创建一个新文件.

This is ok as long as it is running within one batch run. But after the next batch run is executed (next 10.000 messages are processed) it creates a new file for the next 10.000 messages.

现在的问题是:每个文件(块)保留了 50mb 的文件系统,但只包含大约 1mb(10.000 条消息).与其每次批量运行都创建新文件,我宁愿将所有文件都附加到一个文件中,只要它不超过 50mb.

The problem is now: Each file (block) reserves 50mb of the file system but only contains around 1mb (10.000 messages). Instead of creating new files each batch run, I would prefer to have it all appended to one file as long as it is not exceeding the 50mb.

你知道怎么做吗,或者为什么它在我的例子中不起作用?您可以在此处查看我的编码:

Do you know how to do this or why it is not workin in my example? You can have a look at my coding here:

import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.immutable.Set


object SparkStreaming extends Constants {


  def main(args: Array[String]) {

//create a new Spark configuration...
val conf = new SparkConf()
  .setMaster("local[2]") // ...using 2 cores
  .setAppName("Streaming")
  .set("spark.streaming.kafka.maxRatePerPartition", "10000")  //... processing max. 10000 messages per second

//create a streaming context for micro batch
val ssc = new StreamingContext(conf, Seconds(1)) //Note: processing max. 1*10000 messages (see config above.)

//Setup up Kafka DStream
val kafkaParams = Map("metadata.broker.list" -> "sandbox.hortonworks.com:6667",
  "auto.offset.reset" -> "smallest") //Start from the beginning
val kafkaTopics = Set(KAFKA_TOPIC_PARQUET)

val directKafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,
  kafkaParams, kafkaTopics)

val records = directKafkaStream.map(Source => StreamingFunctions.transformAvroSource(Source))


records.foreachRDD((rdd: RDD[TimeseriesRddRecord], time: Time) => {
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) // Worker node singleton
  import sqlContext.implicits._

  val dataFrame = rdd.toDF()

  dataFrame.write.mode(SaveMode.Append).partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*).parquet(PARQUET_FILE_PATH_TIMESERIES_LOCAL)
  println(s"Written entries: ${dataFrame.count()}")
}
)


//start streaming until the process is killed
ssc.start()
ssc.awaitTermination()

  }


  /** Case class for converting RDD to DataFrame */
  case class DataFrameRecord(thingId: String, timestamp: Long, propertyName: String, propertyValue: Double)


  /** Lazily instantiated singleton instance of SQLContext */
  object SQLContextSingleton {

@transient private var instance: SQLContext = _

def getInstance(sparkContext: SparkContext): SQLContext = {
  if (instance == null) {
    instance = new SQLContext(sparkContext)
  }
  instance
    }
  }

}

我很乐意听取您的意见.谢谢,亚历克斯

I would be happy to get your thoughts on that. Thanks, Alex

推荐答案

这可以通过使用 coalesce 函数然后通过覆盖现有文件来完成.

This can be accomplished by using the coalesce function and then by overwriting the existing file.

但正如线程中所讨论的那样当程序运行时,Spark 合并会丢失文件aborted 程序中断时出现错误.

But as discussed in the thread Spark coalesce looses file when program is aborted it comes to errors when the program is interrupted.

所以暂时实现这样的逻辑似乎还不够.

So for the time being it seems to be not sufficient to implement such logic.

这篇关于Spark 将 kafka 批量流式传输到单个文件中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆