DataFrame-ified zipWithIndex [英] DataFrame-ified zipWithIndex

查看:15
本文介绍了DataFrame-ified zipWithIndex的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试解决向数据集添加序列号的古老问题.我正在使用 DataFrames,但似乎没有与 RDD.zipWithIndex 等效的 DataFrame.另一方面,以下内容或多或少地按照我想要的方式工作:

I am trying to solve the age-old problem of adding a sequence number to a data set. I am working with DataFrames, and there appears to be no DataFrame equivalent to RDD.zipWithIndex. On the other hand, the following works more or less the way I want it to:

val origDF = sqlContext.load(...)    

val seqDF= sqlContext.createDataFrame(
    origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),
    StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields)
)

在我的实际应用中,origDF 不会直接从文件中加载——它将通过将 2-3 个其他 DataFrame 连接在一起来创建,并且将包含超过 1 亿行.

In my actual application, origDF won't be loaded directly out of a file -- it is going to be created by joining 2-3 other DataFrames together and will contain upwards of 100 million rows.

有没有更好的方法来做到这一点?我可以做些什么来优化它?

Is there a better way to do this? What can I do to optimize it?

推荐答案

从 Spark 1.6 开始,就有一个函数叫做 monotonically_increasing_id()
它为每行生成一个具有唯一 64 位单调索引的新列
但这不是必然的,每个分区开始一个新的范围,所以我们必须在使用它之前计算每个分区的偏移量.
试图提供无 rdd"解决方案,我最后得到了一些 collect(),但它只收集偏移量,一个值每个分区,所以它不会导致 OOM

Since Spark 1.6 there is a function called monotonically_increasing_id()
It generates a new column with unique 64-bit monotonic index for each row
But it isn't consequential, each partition starts a new range, so we must calculate each partition offset before using it.
Trying to provide an "rdd-free" solution, I ended up with some collect(), but it only collects offsets, one value per partition, so it will not cause OOM

def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {
    val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())

    val partitionOffsets = dfWithPartitionId
        .groupBy("partition_id")
        .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
        .orderBy("partition_id")
        .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" )
        .collect()
        .map(_.getLong(0))
        .toArray
        
     dfWithPartitionId
        .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id")))
        .withColumn(indexName, col("partition_offset") + col("inc_id"))
        .drop("partition_id", "partition_offset", "inc_id")
}

此解决方案不会重新打包原始行,也不会重新分区原始大数据帧,因此在现实世界中速度非常快:200GB 的 CSV 数据(4300 万行,150 列)在 2 分钟内在 240 个内核上读取、索引并打包成 Parquet
在测试了我的解决方案后,我运行了 Kirk Broadhurst 的解决方案,结果慢了 20 秒
您可能想或不想使用 dfWithPartitionId.cache(),取决于任务

This solution doesn't repack the original rows and doesn't repartition the original huge dataframe, so it is quite fast in real world: 200GB of CSV data (43 million rows with 150 columns) read, indexed and packed to parquet in 2 minutes on 240 cores
After testing my solution, I have run Kirk Broadhurst's solution and it was 20 seconds slower
You may want or not want to use dfWithPartitionId.cache(), depends on task

这篇关于DataFrame-ified zipWithIndex的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆