如何在不从DataFrame进行转换和访问的情况下将列添加到Dataset? [英] How to add a column to Dataset without converting from a DataFrame and accessing it?

查看:111
本文介绍了如何在不从DataFrame进行转换和访问的情况下将列添加到Dataset?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道使用.withColumn()UDF向Spark DataSet添加新列的方法,该方法返回一个DataFrame.我也知道,我们可以将结果DataFrame转换为DataSet.

I am aware of method to add a new column to a Spark DataSet using .withColumn() and a UDF, which returns a DataFrame. I am also aware that, we can convert the resulting DataFrame to a DataSet.

我的问题是:

  1. 如果我们仍然遵循传统的DF方法(即,将列名作为UDF输入的字符串传递),那么DataSet的类型安全如何在这里发挥作用
  2. 是否像以前使用RDD一样,以一种面向对象的方式"访问列(不将列名作为字符串传递),以追加新列.
  3. 如何在常规操作(如地图,过滤器等)中访问新列?

例如:

    scala> case class Temp(a : Int, b : String)    //creating case class
    scala> val df = Seq((1,"1str"),(2,"2str),(3,"3str")).toDS    // creating DS
    scala> val appendUDF = udf( (b : String) => b + "ing")      // sample UDF

    scala> df.withColumn("c",df("b"))   // adding a new column
    res5: org.apache.spark.sql.DataFrame = [a: int, b: string ... 1 more field]

    scala> res5.as[Temp]   // converting to DS
    res6: org.apache.spark.sql.Dataset[Temp] = [a: int, b: string ... 1 more field]

    scala> res6.map( x =>x.  
    // list of autosuggestion :
    a   canEqual   equals     productArity     productIterator   toString   
    b   copy       hashCode   productElement   productPrefix 

我无法使用.withColumn()添加的新列c,因为列c不在案例类Temp(仅包含ab)的情况下使用res5.as[Temp]将其转换为DS的瞬间.

the new column c, that i have added using .withColumn() is not accessible, Because column c is not in the case class Temp (it contains only a & b) at the instant when it is converted to DS using res5.as[Temp].

如何访问列c?

推荐答案

Dataset的类型安全的世界中,您会将一个结构映射到另一个结构中.

In the type-safe world of Datasets you'd map an structure into another.

也就是说,对于每个转换,我们都需要数据的模式表示(如RDD所需要的).要访问上面的"c",我们需要创建一个提供对它的访问权限的新模式.

That is, for each transformation, we need schema representations of the data (as it is needed for RDDs). To access 'c' above, we need to create a new schema that provides access to it.

case class A(a:String)
case class BC(b:String, c:String)
val f:A => BC = a=> BC(a.a,"c") // Transforms an A into a BC

val data = (1 to 10).map(i => A(i.toString))
val dsa = spark.createDataset(data)
// dsa: org.apache.spark.sql.Dataset[A] = [a: string]

val dsb = dsa.map(f)
//dsb: org.apache.spark.sql.Dataset[BC] = [b: string, c: string]

这篇关于如何在不从DataFrame进行转换和访问的情况下将列添加到Dataset?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆