怎么把List [Double]转换成列? [英] How to convert List[Double] to Columns?

查看:173
本文介绍了怎么把List [Double]转换成列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有List[Double],如何将其转换为org.apache.spark.sql.Column.我正在尝试使用.withColumn()将其作为列插入到现有的DataFrame中.

I have List[Double], how to convert it to org.apache.spark.sql.Column. I am trying to insert it as a column using .withColumn() to existing DataFrame.

推荐答案

不能直接完成. Column不是数据结构,而是特定SQL表达式的表示.它没有绑定到特定数据.您必须先转换数据.解决此问题的一种方法是按索引访问parallelizejoin:

It cannot be done directly. Column is not a data structure but a representation of a specific SQL expression. It is not bound to a specific data. You'll have to transform your data first. One way to approach this is to parallelize and join by index:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, DoubleType}

val df = Seq(("a", 2), ("b", 1), ("c", 0)).toDF("x", "y")
val aList = List(1.0, -1.0, 0.0)

val rows = df.rdd.zipWithIndex.map(_.swap)
  .join(sc.parallelize(aList).zipWithIndex.map(_.swap))
  .values
  .map { case (row: Row, x: Double) => Row.fromSeq(row.toSeq :+ x) }

sqlContext.createDataFrame(rows, df.schema.add("z", DoubleType, false))

另一种类似的方法是索引并使用UDF处理其余部分:

Another similar approach is to index and use and UDF to handle the rest:

import scala.util.Try

val indexedDf = sqlContext.createDataFrame(
  df.rdd.zipWithIndex.map {
    case (row: Row, i: Long) => Row.fromSeq(row.toSeq :+ i)
  },
  df.schema.add("idx_", "long")
)

def addValue(vs: Vector[Double]) = udf((i: Long) => Try(vs(i.toInt)).toOption)

indexedDf.withColumn("z", addValue(aList.toVector)($"idx_"))

不幸的是,这两种解决方案都会遭受这些问题的困扰.首先,通过驱动程序传递本地数据会在程序中引入严重的瓶颈.通常,应直接从执行程序访问数据.如果要迭代执行此操作,另一个问题是RDD谱系不断增长.

Unfortunately both solutions will suffer from the issues. First of all passing local data through driver introduces a serious bottleneck in your program. Typically data should accessed directly from the executors. Another problem are growing RDD lineages if you want to perform this operation iteratively.

尽管可以通过对第一个问题进行检查来解决第二个问题,但这通常会使这个想法变得毫无用处.我强烈建议您要么先构建完整的结构,然后在Spark上阅读它,要么以可以利用Spark体系结构的方式重建管道.例如,如果数据来自外部来源,则使用map/mapPartitions直接对每个数据块执行读取.

While the second issue can be addressed by checkpointing the first one makes this idea useless in general. I would strongly suggest that you either build completely structure first, and read it on Spark, or rebuild you pipeline in a way that can leverage Spark architecture. For example if data comes from an external source perform reads directly for each chunk of data using map / mapPartitions.

这篇关于怎么把List [Double]转换成列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆