卡夫卡星火批量涌入单个文件 [英] Spark batch-streaming of kafka into single file

查看:159
本文介绍了卡夫卡星火批量涌入单个文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从卡夫卡使用批处理流(maxRatePerPartition 10.000)数据流。因此,在每批我处理10.000卡夫卡的消息。

I am streaming data from Kafka using batch streaming (maxRatePerPartition 10.000). So in each batch I process 10.000 kafka messages.

在这个批处理我通过创建一个数据框出RDD的处理每封邮件。处理之后,我每个处理的记录保存到使用相同的文件:dataFrame.write.mode(SaveMode.append)。
所以其附加的所有邮件到同一个文件。

Within this batch run I process each message by creating a dataFrame out of the rdd. After processing, I save each processed record to the same file using: dataFrame.write.mode(SaveMode.append). So it appends all messages to the same file.

这是确定的,只要它是一个批次运行中运行。但在执行下一个批​​次运行后(下一10.000消息处理)它创建为下一10.000消息的新文件。

This is ok as long as it is running within one batch run. But after the next batch run is executed (next 10.000 messages are processed) it creates a new file for the next 10.000 messages.

现在的问题是:每一个文件(框)保留的文件系统的50MB的但仅包含约1MB(10.000消息)。
相反,创建新文件每批运行,我想preFER拥有一切附加到一个文件,只要它没有超过50MB。

The problem is now: Each file (block) reserves 50mb of the file system but only contains around 1mb (10.000 messages). Instead of creating new files each batch run, I would prefer to have it all appended to one file as long as it is not exceeding the 50mb.

你知道如何做到这一点,或为什么它不在我的例子干活?你可以看看我的编码这里:

Do you know how to do this or why it is not workin in my example? You can have a look at my coding here:

import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.immutable.Set


object SparkStreaming extends Constants {


  def main(args: Array[String]) {

//create a new Spark configuration...
val conf = new SparkConf()
  .setMaster("local[2]") // ...using 2 cores
  .setAppName("Streaming")
  .set("spark.streaming.kafka.maxRatePerPartition", "10000")  //... processing max. 10000 messages per second

//create a streaming context for micro batch
val ssc = new StreamingContext(conf, Seconds(1)) //Note: processing max. 1*10000 messages (see config above.)

//Setup up Kafka DStream
val kafkaParams = Map("metadata.broker.list" -> "sandbox.hortonworks.com:6667",
  "auto.offset.reset" -> "smallest") //Start from the beginning
val kafkaTopics = Set(KAFKA_TOPIC_PARQUET)

val directKafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,
  kafkaParams, kafkaTopics)

val records = directKafkaStream.map(Source => StreamingFunctions.transformAvroSource(Source))


records.foreachRDD((rdd: RDD[TimeseriesRddRecord], time: Time) => {
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) // Worker node singleton
  import sqlContext.implicits._

  val dataFrame = rdd.toDF()

  dataFrame.write.mode(SaveMode.Append).partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*).parquet(PARQUET_FILE_PATH_TIMESERIES_LOCAL)
  println(s"Written entries: ${dataFrame.count()}")
}
)


//start streaming until the process is killed
ssc.start()
ssc.awaitTermination()

  }


  /** Case class for converting RDD to DataFrame */
  case class DataFrameRecord(thingId: String, timestamp: Long, propertyName: String, propertyValue: Double)


  /** Lazily instantiated singleton instance of SQLContext */
  object SQLContextSingleton {

@transient private var instance: SQLContext = _

def getInstance(sparkContext: SparkContext): SQLContext = {
  if (instance == null) {
    instance = new SQLContext(sparkContext)
  }
  instance
    }
  }

}

我会很乐意让你上的想法。
谢谢,亚历克斯

I would be happy to get your thoughts on that. Thanks, Alex

推荐答案

这可以通过使用 COALESCE 函数,然后通过覆盖现有文件来实现的。

This can be accomplished by using the coalesce function and then by overwriting the existing file.

但是作为在线程<一个讨论href=\"http://stackoverflow.com/questions/33364089/spark-coalesce-looses-file-when-program-is-aborted\">Spark当程序被中止程序被中断,当涉及到错误凝聚失去文件。

But as discussed in the thread Spark coalesce looses file when program is aborted it comes to errors when the program is interrupted.

所以暂且这似乎是不充分的,以实现这样的逻辑

So for the time being it seems to be not sufficient to implement such logic.

这篇关于卡夫卡星火批量涌入单个文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆