通过键 Spark 写入多个输出 - 一项 Spark 作业 [英] Write to multiple outputs by key Spark - one Spark job

查看:22
本文介绍了通过键 Spark 写入多个输出 - 一项 Spark 作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在单个作业中使用 Spark 写入依赖于键的多个输出.

How can you write to multiple outputs dependent on the key using Spark in a single Job.

相关:写入多个输出通过关键 Scalding Hadoop,一个 MapReduce 作业

例如

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

将确保 cat prefix/1

a
b

cat prefix/2 将是

c

我最近添加了一个新答案,其中包括完整导入、皮条客和压缩编解码器,请参阅 https://stackoverflow.com/a/46118044/1586965,除了先前的答案之外,这可能会有所帮助.

I've recently added a new answer that includes full imports, pimp and compression codec, see https://stackoverflow.com/a/46118044/1586965, which may be helpful in addition to the earlier answers.

推荐答案

这包括请求的编解码器、必要的导入和请求的皮条客.

This includes the codec as requested, necessary imports, and pimp as requested.

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
  def writeAsMultiple(prefix: String, codec: String,
                      keyName: String = "key")
                     (implicit sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._

    rdd.toDF(keyName, "_2").write.partitionBy(keyName)
    .format("text").option("codec", codec).save(prefix)
  }
}

val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

与 OP 的一个细微差别是它将 = 前缀到目录名称.例如

One subtle difference to the OP is that it will prefix <keyName>= to the directory names. E.g.

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

会给:

prefix/key=1/part-00000
prefix/key=2/part-00000

其中 prefix/my_number=1/part-00000 将包含行 ab,以及 prefix/my_number=2/part-00000 将包含行 c.

where prefix/my_number=1/part-00000 would contain the lines a and b, and prefix/my_number=2/part-00000 would contain the line c.

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")

会给:

prefix/foo=1/part-00000
prefix/foo=2/part-00000

应该清楚如何编辑parquet.

最后是 Dataset 的示例,它可能比使用元组更好.

Finally below is an example for Dataset, which is perhaps nicer that using Tuples.

implicit class PimpedDataset[T](dataset: Dataset[T]) {
  def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
    dataset.write.partitionBy(field)
    .format("text").option("codec", codec).save(prefix)
  }
}

这篇关于通过键 Spark 写入多个输出 - 一项 Spark 作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆