怎么把List [Double]转换成列? [英] How to convert List[Double] to Columns?
问题描述
我有List[Double]
,如何将其转换为org.apache.spark.sql.Column
.我正在尝试使用.withColumn()
将其作为列插入到现有的DataFrame
中.
I have List[Double]
, how to convert it to org.apache.spark.sql.Column
. I am trying to insert it as a column using .withColumn()
to existing DataFrame
.
推荐答案
不能直接完成. Column
不是数据结构,而是特定SQL表达式的表示.它没有绑定到特定数据.您必须先转换数据.解决此问题的一种方法是按索引访问parallelize
和join
:
It cannot be done directly. Column
is not a data structure but a representation of a specific SQL expression. It is not bound to a specific data. You'll have to transform your data first. One way to approach this is to parallelize
and join
by index:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, DoubleType}
val df = Seq(("a", 2), ("b", 1), ("c", 0)).toDF("x", "y")
val aList = List(1.0, -1.0, 0.0)
val rows = df.rdd.zipWithIndex.map(_.swap)
.join(sc.parallelize(aList).zipWithIndex.map(_.swap))
.values
.map { case (row: Row, x: Double) => Row.fromSeq(row.toSeq :+ x) }
sqlContext.createDataFrame(rows, df.schema.add("z", DoubleType, false))
另一种类似的方法是索引并使用UDF处理其余部分:
Another similar approach is to index and use and UDF to handle the rest:
import scala.util.Try
val indexedDf = sqlContext.createDataFrame(
df.rdd.zipWithIndex.map {
case (row: Row, i: Long) => Row.fromSeq(row.toSeq :+ i)
},
df.schema.add("idx_", "long")
)
def addValue(vs: Vector[Double]) = udf((i: Long) => Try(vs(i.toInt)).toOption)
indexedDf.withColumn("z", addValue(aList.toVector)($"idx_"))
不幸的是,这两种解决方案都会遭受这些问题的困扰.首先,通过驱动程序传递本地数据会在程序中引入严重的瓶颈.通常,应直接从执行程序访问数据.如果要迭代执行此操作,另一个问题是RDD谱系不断增长.
Unfortunately both solutions will suffer from the issues. First of all passing local data through driver introduces a serious bottleneck in your program. Typically data should accessed directly from the executors. Another problem are growing RDD lineages if you want to perform this operation iteratively.
尽管可以通过对第一个问题进行检查来解决第二个问题,但这通常会使这个想法变得毫无用处.我强烈建议您要么先构建完整的结构,然后在Spark上阅读它,要么以可以利用Spark体系结构的方式重建管道.例如,如果数据来自外部来源,则使用map
/mapPartitions
直接对每个数据块执行读取.
While the second issue can be addressed by checkpointing the first one makes this idea useless in general. I would strongly suggest that you either build completely structure first, and read it on Spark, or rebuild you pipeline in a way that can leverage Spark architecture. For example if data comes from an external source perform reads directly for each chunk of data using map
/ mapPartitions
.
这篇关于怎么把List [Double]转换成列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!