如何在mapPartitions内的org.apache.spark.sql.Row中添加列 [英] How to add columns into org.apache.spark.sql.Row inside of mapPartitions
问题描述
我是scala和spark的新手,请记住这一点:)
I am a newbie at scala and spark, please keep that in mind :)
实际上,我有三个问题
- 如果我要创建带有少量附加列的新行,应该如何定义将其传递给df.rdd.mapPartitions的函数
- 如何在Row对象中添加几列(或创建一个新列)
- 如何从创建的RDD创建DataFrame
先谢谢您
推荐答案
通常不需要这样做,最好使用UDF,但您在这里:
Usually there should be no need for that and it is better to use UDFs but here you are:
如果我要创建带有少量附加列的新行,应该如何定义将其传递给df.rdd.mapPartitions的函数
How should I define function to pass it into df.rdd.mapPartitions, if I want to create new Row with few additional columns
应该使用Iterator[Row]
并返回Iterator[T]
,因此在您的情况下,您应该使用类似的内容
It should take Iterator[Row]
and return Iterator[T]
so in your case you should use something like this
import org.apache.spark.sql.Row
def transformRows(iter: Iterator[Row]): Iterator[Row] = ???
如何向Row对象添加几列(或创建一个新列)
How can I add few columns into Row object(or create a new one)
有多种访问Row
值的方法,包括Row.get*
方法,Row.toSeq
等.可以使用Row.apply
,Row.fromSeq
,Row.fromTuple
或RowFactory
创建新的Row
.例如:
There are multiple ways of accessing Row
values including Row.get*
methods, Row.toSeq
etc. New Row
can be created using Row.apply
, Row.fromSeq
, Row.fromTuple
or RowFactory
. For example:
def transformRow(row: Row): Row = Row.fromSeq(row.toSeq ++ Array[Any](-1, 1))
如何从创建的RDD创建DataFrame
How create DataFrame from created RDD
如果拥有RDD[Row]
,则可以使用SQLContext.createDataFrame
并提供架构.
If you have RDD[Row]
you can use SQLContext.createDataFrame
and provide schema.
将所有内容放在一起:
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
val df = sc.parallelize(Seq(
(1.0, 2.0), (0.0, -1.0),
(3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
val newSchema = StructType(df.schema.fields ++ Array(
StructField("z", IntegerType, false), StructField("v", IntegerType, false)))
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show
// +---+----+---+---+
// | x| y| z| v|
// +---+----+---+---+
// |1.0| 2.0| -1| 1|
// |0.0|-1.0| -1| 1|
// |3.0| 4.0| -1| 1|
// |6.0|-2.3| -1| 1|
// +---+----+---+---+
这篇关于如何在mapPartitions内的org.apache.spark.sql.Row中添加列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!