数据帧上的通用迭代器(Spark/scala) [英] Generic iterator over dataframe (Spark/scala)

查看:33
本文介绍了数据帧上的通用迭代器(Spark/scala)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要按特定顺序遍历数据框并应用一些复杂的逻辑来计算新列.

在下面的示例中,我将使用简单的表达式,其中 s 的当前值是所有先前值的乘积,因此这似乎可以使用 UDF 甚至解析函数来完成.然而,实际上逻辑要复杂得多.

下面的代码做了需要的事情

import org.apache.spark.sql.Row导入 org.apache.spark.sql.types._导入 org.apache.spark.sql.catalyst.encoders.RowEncoderval q = """选择 10 x, 1 yunion all select 10, 2union all select 10, 3union all select 20, 6union all select 20, 4union all select 20, 5"""val df = spark.sql(q)def f_row(iter: Iterator[Row]) : Iterator[Row] = {iter.scanLeft(Row(0,0,1)){情况 (r1, r2) =>{val (x1, y1, s1) = r1 匹配 {case Row(x: Int, y: Int, s: Int) =>(x, y, s)}val (x2, y2) = r2 匹配 {case Row(x: Int, y: Int) =>(x, y)}行(x2,y2,s1 * y2)}}.drop(1)}val schema = new StructType().add(StructField("x", IntegerType, true)).add(StructField("y", IntegerType, true)).add(StructField("s", IntegerType, true))val 编码器 = RowEncoder(schema)df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

输出

scala>df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show+---+---+---+|×|是||+---+---+---+|20|4|4||20|5|20||20|6|120||10|1|1||10|2|2||10|3|6|+---+---+---+

我不喜欢它的地方

1) 即使 Spark 可以推断数据框的名称和类型,我也明确定义了模式

scala>dfres1: org.apache.spark.sql.DataFrame = [x: int, y: int]

2) 如果我向数据框中添加任何新列,那么我必须再次声明模式,更烦人的是 - 重新定义函数!

假设数据框中有新列 z.在这种情况下,我必须更改 f_row 中的几乎每一行.

def f_row(iter: Iterator[Row]) : Iterator[Row] = {iter.scanLeft(Row(0,0,"",1)) {情况 (r1, r2) =>{val (x1, y1, z1, s1) = r1 match {case Row(x: Int, y: Int, z: String, s: Int) =>(x, y, z, s)}val (x2, y2, z2) = r2 匹配 {case Row(x: Int, y: Int, z: String) =>(x, y, z)}行(x2,y2,z2,s1 * y2)}}.drop(1)}val schema = new StructType().add(StructField("x", IntegerType, true)).add(StructField("y", IntegerType, true)).add(StructField("z", StringType, true)).add(StructField("s", IntegerType, true))val 编码器 = RowEncoder(schema)df.withColumn("z", lit("dummy")).repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

输出

scala>df.withColumn("z", lit("dummy")).repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show+---+---+-----+---+|×|是|z||+---+---+-----+---+|20|4|假人|4||20|5|假人|20||20|6|假人|120||10|1|假人|1||10|2|假人|2||10|3|假人|6|+---+---+-----+---+

有没有办法以更通用的方式实现逻辑,这样我就不需要创建函数来迭代每个特定的数据帧?或者至少避免在向数据框中添加新列后更改代码,这些列未用于计算逻辑.

请参阅下面的更新问题.

更新

以下是两种更通用的迭代方式,但仍有一些缺点.

//选项 1def f_row(iter: Iterator[Row]): Iterator[Row] = {val r = Row.fromSeq(Row(0, 0).toSeq :+ 1)iter.scanLeft(r)((r1, r2) =>Row.fromSeq(r2.toSeq :+ r1.getInt(r1.size - 1) * r2.getInt(r2.fieldIndex("y")))).drop(1)}df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show//选项 2def f_row(iter: Iterator[Row]): Iterator[Row] = {iter.map{变量 s = 1r=>{s = s * r.getInt(r.fieldIndex("y"))Row.fromSeq(r.toSeq :+ s)}}}df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

如果将新列添加到数据框中,则必须在选项 1 中更改 iter.scanLeft 的初始值.此外,我不太喜欢选项 2,因为它使用可变变量.

有没有办法改进代码,使其纯粹是功能性的,并且在向数据框中添加新列时不需要更改?

解决方案

好了,下面有足够的解决方案

def f_row(iter: Iterator[Row]): Iterator[Row] = {如果(iter.hasNext){val head = iter.nextval r = Row.fromSeq(head.toSeq :+ head.getInt(head.fieldIndex("y")))iter.scanLeft(r)((r1, r2) =>Row.fromSeq(r2.toSeq :+ r1.getInt(r1.size - 1) * r2.getInt(r2.fieldIndex("y"))))其他迭代}val 编码器 =RowEncoder(StructType(df.schema.fields :+ StructField("s", IntegerType, false)))df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

更新

像 getInt 这样的函数可以避免使用更通用的 getAs.

另外,为了能够通过名称访问r1的行,我们可以生成GenericRowWithSchema,它是Row的子类.>

f_row 中添加了隐式参数,以便函数可以使用数据框的当前模式,同时它可以用作mapPartitions 的参数.

import org.apache.spark.sql.types._导入 org.apache.spark.sql.Row导入 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema导入 org.apache.spark.sql.catalyst.encoders.RowEncoder隐式 val 模式 = StructType(df.schema.fields :+ StructField("result", IntegerType))隐式 val 编码器 = RowEncoder(schema)def mul(x1: Int, x2: Int) = x1 * x2;def f_row(iter: Iterator[Row])(implicit currentSchema : StructType) : Iterator[Row] = {如果(iter.hasNext){val head = iter.next值 r =new GenericRowWithSchema((head.toSeq :+ (head.getAs("y"))).toArray, currentSchema)iter.scanLeft(r)((r1, r2) =>new GenericRowWithSchema((r2.toSeq :+ mul(r1.getAs("result"), r2.getAs("y"))).toArray, currentSchema))其他迭代}df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row).show

最后,逻辑可以用尾递归的方式实现.

import scala.annotation.tailrecdef f_row(iter: Iterator[Row]) = {@tailrecdef f_row_(iter: Iterator[Row], tmp: Int, result: Iterator[Row]): Iterator[Row] = {如果(iter.hasNext){val r = iter.nextf_row_(iter, mul(tmp, r.getAs("y")),result ++ Iterator(Row.fromSeq(r.toSeq :+ mul(tmp, r.getAs("y")))))否则结果}f_row_(iter, 1, Iterator[Row]())}df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row).show

I need to iterate over data frame in specific order and apply some complex logic to calculate new column.

In below example I'll be using simple expression where current value for s is multiplication of all previous values thus it may seem like this can be done using UDF or even analytic functions. However, in reality logic is much more complex.

Below code does what is needed

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder

val q = """
select 10 x, 1 y
union all select 10, 2
union all select 10, 3
union all select 20, 6
union all select 20, 4
union all select 20, 5
"""
val df = spark.sql(q)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
  iter.scanLeft(Row(0,0,1)) {
    case (r1, r2) => {
      val (x1, y1, s1) = r1 match {case Row(x: Int, y: Int, s: Int) => (x, y, s)}
      val (x2, y2)     = r2 match {case Row(x: Int, y: Int) => (x, y)}
      Row(x2, y2, s1 * y2)
    }
  }.drop(1)
}
val schema = new StructType().
             add(StructField("x", IntegerType, true)).
             add(StructField("y", IntegerType, true)).
             add(StructField("s", IntegerType, true))
val encoder = RowEncoder(schema)
df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

Output

scala> df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show
+---+---+---+
|  x|  y|  s|
+---+---+---+
| 20|  4|  4|
| 20|  5| 20|
| 20|  6|120|
| 10|  1|  1|
| 10|  2|  2|
| 10|  3|  6|
+---+---+---+

What I do not like about it is

1) I explicitly define schema even though Spark can infer names and types for data frame

scala> df
res1: org.apache.spark.sql.DataFrame = [x: int, y: int]

2) If I add any new column to data frame then I have to declare schema again and what is more annoying - re-define function!

Assume there is new column z in data frame. In this case I have to change almost every line in f_row.

def f_row(iter: Iterator[Row]) : Iterator[Row] = {
  iter.scanLeft(Row(0,0,"",1)) {
    case (r1, r2) => {
      val (x1, y1, z1, s1) = r1 match {case Row(x: Int, y: Int, z: String, s: Int) => (x, y, z, s)}
      val (x2, y2, z2)     = r2 match {case Row(x: Int, y: Int, z: String) => (x, y, z)}
      Row(x2, y2, z2, s1 * y2)
    }
  }.drop(1)
}
val schema = new StructType().
             add(StructField("x", IntegerType, true)).
             add(StructField("y", IntegerType, true)).
             add(StructField("z", StringType, true)).
             add(StructField("s", IntegerType, true))
val encoder = RowEncoder(schema)
df.withColumn("z", lit("dummy")).repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

Output

scala> df.withColumn("z", lit("dummy")).repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show
+---+---+-----+---+
|  x|  y|    z|  s|
+---+---+-----+---+
| 20|  4|dummy|  4|
| 20|  5|dummy| 20|
| 20|  6|dummy|120|
| 10|  1|dummy|  1|
| 10|  2|dummy|  2|
| 10|  3|dummy|  6|
+---+---+-----+---+

Is there a way to implement logic in more generic way so I do not need to create function to iterate over every specific data frame? Or at least to avoid code changes after adding new columns into data frame which are not used in calculation logic.

Please see updated question below.

Update

Below are two options to iterate in more generic way but still with some drawbacks.

// option 1
def f_row(iter: Iterator[Row]): Iterator[Row] = {
  val r = Row.fromSeq(Row(0, 0).toSeq :+ 1)
  iter.scanLeft(r)((r1, r2) => 
    Row.fromSeq(r2.toSeq :+ r1.getInt(r1.size - 1) * r2.getInt(r2.fieldIndex("y")))
  ).drop(1)
}
df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

// option 2
def f_row(iter: Iterator[Row]): Iterator[Row] = {
  iter.map{
    var s = 1
    r => {
      s = s * r.getInt(r.fieldIndex("y"))
      Row.fromSeq(r.toSeq :+ s)
    }
  }
}
df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

If a new column added to data frame then initial value for iter.scanLeft has to be changed in Option 1. Also I do not really like Option 2 because it uses mutable var.

Is there a way to improve the code so it's purely functional and no changes are needed when new column added to the data frame?

解决方案

Well, sufficient solution is below

def f_row(iter: Iterator[Row]): Iterator[Row] = {
  if (iter.hasNext) {
    val head = iter.next
    val r = Row.fromSeq(head.toSeq :+ head.getInt(head.fieldIndex("y")))
    iter.scanLeft(r)((r1, r2) => 
      Row.fromSeq(r2.toSeq :+ r1.getInt(r1.size - 1) * r2.getInt(r2.fieldIndex("y"))))
  } else iter
}
val encoder = 
  RowEncoder(StructType(df.schema.fields :+ StructField("s", IntegerType, false)))
df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

Update

Functions like getInt can be avoided in favor of more generic getAs.

Also, in order to be able to access rows of r1 by name we can generate GenericRowWithSchema which is subclass of Row.

Implicit parameter has been added to f_row so that function can use current schema of the data frame and in the same time it can be used as a parameter of the mapPartitions.

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.encoders.RowEncoder

implicit val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
implicit val encoder = RowEncoder(schema)

def mul(x1: Int, x2: Int) = x1 * x2;

def f_row(iter: Iterator[Row])(implicit currentSchema : StructType) : Iterator[Row] = {
  if (iter.hasNext) {
    val head = iter.next
    val r =
      new GenericRowWithSchema((head.toSeq :+ (head.getAs("y"))).toArray, currentSchema)

    iter.scanLeft(r)((r1, r2) =>
      new GenericRowWithSchema((r2.toSeq :+ mul(r1.getAs("result"), r2.getAs("y"))).toArray, currentSchema))
  } else iter
}

df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row).show

Finally, logic can be implemented in a tail recursive manner.

import scala.annotation.tailrec

def f_row(iter: Iterator[Row]) = {
  @tailrec
  def f_row_(iter: Iterator[Row], tmp: Int, result: Iterator[Row]): Iterator[Row] = {
    if (iter.hasNext) {
      val r = iter.next
      f_row_(iter, mul(tmp, r.getAs("y")),
        result ++ Iterator(Row.fromSeq(r.toSeq :+ mul(tmp, r.getAs("y")))))
    } else result
  }
  f_row_(iter, 1, Iterator[Row]())
}

df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row).show

这篇关于数据帧上的通用迭代器(Spark/scala)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆