避免在Spark Streaming中为空分区写入文件 [英] Avoid write files for empty partitions in Spark Streaming

查看:287
本文介绍了避免在Spark Streaming中为空分区写入文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有Spark Streaming作业,可从kafka分区读取数据(

I have Spark Streaming job which reads data from kafka partitions (one executor per partition).
I need to save transformed values to HDFS, but need to avoid empty files creation.
I tried to use isEmpty, but this doesn't help when not all partitions are empty.

P.S.由于性能下降,重新分配是不可接受的解决方案.

P.S. repartition is not an acceptable solution due to perfomance degradation.

推荐答案

该代码仅适用于PairRDD.

The code works for PairRDD only.

文本代码:

  val conf = ssc.sparkContext.hadoopConfiguration
  conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
    classOf[TextOutputFormat[Text, NullWritable]]
    classOf[OutputFormat[Text, NullWritable]])

  kafkaRdd.map(_.value -> NullWritable.get)
    .saveAsNewAPIHadoopFile(basePath,
      classOf[Text],
      classOf[NullWritable],
      classOf[LazyOutputFormat[Text, NullWritable]],
      conf)

avro代码:

  val avro: RDD[(AvroKey[MyEvent], NullWritable)]) = ....
  val conf = ssc.sparkContext.hadoopConfiguration

  conf.set("avro.schema.output.key", MyEvent.SCHEMA$.toString)
  conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
    classOf[AvroKeyOutputFormat[MyEvent]],
    classOf[OutputFormat[AvroKey[MyEvent], NullWritable]])

  avro.saveAsNewAPIHadoopFile(basePath,
    classOf[AvroKey[MyEvent]],
    classOf[NullWritable],
    classOf[LazyOutputFormat[AvroKey[MyEvent], NullWritable]],
    conf)

这篇关于避免在Spark Streaming中为空分区写入文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆