如何在mapPartitions内的org.apache.spark.sql.Row中添加列 [英] How to add columns into org.apache.spark.sql.Row inside of mapPartitions

查看:446
本文介绍了如何在mapPartitions内的org.apache.spark.sql.Row中添加列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是scala和spark的新手,请记住这一点:)

I am a newbie at scala and spark, please keep that in mind :)

实际上,我有三个问题

  1. 如果我要创建带有少量附加列的新行,应该如何定义将其传递给df.rdd.mapPartitions的函数
  2. 如何在Row对象中添加几列(或创建一个新列)
  3. 如何从创建的RDD创建DataFrame

先谢谢您

推荐答案

通常不需要这样做,最好使用UDF,但您在这里:

Usually there should be no need for that and it is better to use UDFs but here you are:

如果我要创建带有少量附加列的新行,应该如何定义将其传递给df.rdd.mapPartitions的函数

How should I define function to pass it into df.rdd.mapPartitions, if I want to create new Row with few additional columns

应该使用Iterator[Row]并返回Iterator[T],因此在您的情况下,您应该使用类似的内容

It should take Iterator[Row] and return Iterator[T] so in your case you should use something like this

import org.apache.spark.sql.Row

def transformRows(iter: Iterator[Row]): Iterator[Row] = ???

如何向Row对象添加几列(或创建一个新列)

How can I add few columns into Row object(or create a new one)

有多种访问Row值的方法,包括Row.get*方法,Row.toSeq等.可以使用Row.applyRow.fromSeqRow.fromTupleRowFactory创建新的Row.例如:

There are multiple ways of accessing Row values including Row.get* methods, Row.toSeq etc. New Row can be created using Row.apply, Row.fromSeq, Row.fromTuple or RowFactory. For example:

def transformRow(row: Row): Row =  Row.fromSeq(row.toSeq ++ Array[Any](-1, 1))

如何从创建的RDD创建DataFrame

How create DataFrame from created RDD

如果拥有RDD[Row],则可以使用SQLContext.createDataFrame并提供架构.

If you have RDD[Row] you can use SQLContext.createDataFrame and provide schema.

将所有内容放在一起:

import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

val  df = sc.parallelize(Seq(
    (1.0, 2.0), (0.0, -1.0),
    (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)

val newSchema = StructType(df.schema.fields ++ Array(
  StructField("z", IntegerType, false), StructField("v", IntegerType, false)))

sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show

// +---+----+---+---+
// |  x|   y|  z|  v|
// +---+----+---+---+
// |1.0| 2.0| -1|  1|
// |0.0|-1.0| -1|  1|
// |3.0| 4.0| -1|  1|
// |6.0|-2.3| -1|  1|
// +---+----+---+---+

这篇关于如何在mapPartitions内的org.apache.spark.sql.Row中添加列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆