如何将 List[Double] 转换为 Columns? [英] How to convert List[Double] to Columns?
问题描述
我有List[Double]
,如何将其转换为org.apache.spark.sql.Column
.我正在尝试使用 .withColumn()
将其作为列插入到现有的 DataFrame
中.
I have List[Double]
, how to convert it to org.apache.spark.sql.Column
. I am trying to insert it as a column using .withColumn()
to existing DataFrame
.
推荐答案
不能直接完成.Column
不是数据结构,而是特定 SQL 表达式的表示.它不受特定数据的约束.您必须首先转换数据.解决此问题的一种方法是通过索引parallelize
和join
:
It cannot be done directly. Column
is not a data structure but a representation of a specific SQL expression. It is not bound to a specific data. You'll have to transform your data first. One way to approach this is to parallelize
and join
by index:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, DoubleType}
val df = Seq(("a", 2), ("b", 1), ("c", 0)).toDF("x", "y")
val aList = List(1.0, -1.0, 0.0)
val rows = df.rdd.zipWithIndex.map(_.swap)
.join(sc.parallelize(aList).zipWithIndex.map(_.swap))
.values
.map { case (row: Row, x: Double) => Row.fromSeq(row.toSeq :+ x) }
sqlContext.createDataFrame(rows, df.schema.add("z", DoubleType, false))
另一种类似的方法是索引并使用 UDF 来处理其余的:
Another similar approach is to index and use and UDF to handle the rest:
import scala.util.Try
val indexedDf = sqlContext.createDataFrame(
df.rdd.zipWithIndex.map {
case (row: Row, i: Long) => Row.fromSeq(row.toSeq :+ i)
},
df.schema.add("idx_", "long")
)
def addValue(vs: Vector[Double]) = udf((i: Long) => Try(vs(i.toInt)).toOption)
indexedDf.withColumn("z", addValue(aList.toVector)($"idx_"))
不幸的是,这两种解决方案都会遇到这些问题.首先,通过驱动程序传递本地数据会在您的程序中引入严重的瓶颈.通常数据应该直接从执行程序访问.如果你想迭代地执行这个操作,另一个问题是不断增长的 RDD 谱系.
Unfortunately both solutions will suffer from the issues. First of all passing local data through driver introduces a serious bottleneck in your program. Typically data should accessed directly from the executors. Another problem are growing RDD lineages if you want to perform this operation iteratively.
虽然第二个问题可以通过检查点来解决,但第一个问题使这个想法在一般情况下毫无用处.我强烈建议您首先构建完整的结构,然后在 Spark 上阅读它,或者以可以利用 Spark 架构的方式重建管道.例如,如果数据来自外部源,则使用 map
/mapPartitions
直接读取每个数据块.
While the second issue can be addressed by checkpointing the first one makes this idea useless in general. I would strongly suggest that you either build completely structure first, and read it on Spark, or rebuild you pipeline in a way that can leverage Spark architecture. For example if data comes from an external source perform reads directly for each chunk of data using map
/ mapPartitions
.
这篇关于如何将 List[Double] 转换为 Columns?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!