如何将 List[Double] 转换为 Columns? [英] How to convert List[Double] to Columns?

查看:36
本文介绍了如何将 List[Double] 转换为 Columns?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有List[Double],如何将其转换为org.apache.spark.sql.Column.我正在尝试使用 .withColumn() 将其作为列插入到现有的 DataFrame 中.

I have List[Double], how to convert it to org.apache.spark.sql.Column. I am trying to insert it as a column using .withColumn() to existing DataFrame.

推荐答案

不能直接完成.Column 不是数据结构,而是特定 SQL 表达式的表示.它不受特定数据的约束.您必须首先转换数据.解决此问题的一种方法是通过索引parallelizejoin:

It cannot be done directly. Column is not a data structure but a representation of a specific SQL expression. It is not bound to a specific data. You'll have to transform your data first. One way to approach this is to parallelize and join by index:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, DoubleType}

val df = Seq(("a", 2), ("b", 1), ("c", 0)).toDF("x", "y")
val aList = List(1.0, -1.0, 0.0)

val rows = df.rdd.zipWithIndex.map(_.swap)
  .join(sc.parallelize(aList).zipWithIndex.map(_.swap))
  .values
  .map { case (row: Row, x: Double) => Row.fromSeq(row.toSeq :+ x) }

sqlContext.createDataFrame(rows, df.schema.add("z", DoubleType, false))

另一种类似的方法是索引并使用 UDF 来处理其余的:

Another similar approach is to index and use and UDF to handle the rest:

import scala.util.Try

val indexedDf = sqlContext.createDataFrame(
  df.rdd.zipWithIndex.map {
    case (row: Row, i: Long) => Row.fromSeq(row.toSeq :+ i)
  },
  df.schema.add("idx_", "long")
)

def addValue(vs: Vector[Double]) = udf((i: Long) => Try(vs(i.toInt)).toOption)

indexedDf.withColumn("z", addValue(aList.toVector)($"idx_"))

不幸的是,这两种解决方案都会遇到这些问题.首先,通过驱动程序传递本地数据会在您的程序中引入严重的瓶颈.通常数据应该直接从执行程序访问.如果你想迭代地执行这个操作,另一个问题是不断增长的 RDD 谱系.

Unfortunately both solutions will suffer from the issues. First of all passing local data through driver introduces a serious bottleneck in your program. Typically data should accessed directly from the executors. Another problem are growing RDD lineages if you want to perform this operation iteratively.

虽然第二个问题可以通过检查点来解决,但第一个问题使这个想法在一般情况下毫无用处.我强烈建议您首先构建完整的结构,然后在 Spark 上阅读它,或者以可以利用 Spark 架构的方式重建管道.例如,如果数据来自外部源,则使用 map/mapPartitions 直接读取每个数据块.

While the second issue can be addressed by checkpointing the first one makes this idea useless in general. I would strongly suggest that you either build completely structure first, and read it on Spark, or rebuild you pipeline in a way that can leverage Spark architecture. For example if data comes from an external source perform reads directly for each chunk of data using map / mapPartitions.

这篇关于如何将 List[Double] 转换为 Columns?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆