Spark DataFrame 列名称未传递给从属节点? [英] Spark DataFrame column names not passed to slave nodes?

查看:28
本文介绍了Spark DataFrame 列名称未传递给从属节点?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在应用一个函数,比如说 f(),通过 map 方法到 DataFrame 的行(称之为 df),但是当我在结果 RDD 上调用 collect 时我看到 NullPointerException 如果 df.columns 作为参数传递给 f().

I'm applying a function, lets say f(), via the map method to rows of a DataFrame (call it df) but I see a NullPointerException when calling collect on resulting RDD if df.columns is passed as an argument to f().

以下 Scala 代码可以粘贴到 spark-shell 中,显示了该问题的一个最小示例(请参阅函数 prepRDD_buggy()).我还在函数 prepRDD() 中发布了我当前针对此问题的解决方法,其中唯一的区别是将列名作为 val 而不是 df.columns.

The following Scala code, which can be pasted inside a spark-shell, shows a minimal example of the issue (see function prepRDD_buggy()). I've also posted my current workaround for this issue in the function prepRDD() where the only difference that column names are passed as a val instead of as df.columns.

能否请一些 Spark 专家指出发生这种情况的确切原因,或者确认我们的假设,即从节点没有获得 DataFrame 列名?

Can some Spark expert please point out the precise reason why this is happening or confirm our hypothesis that slave nodes do not get DataFrame column names?

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

// A Simple DataFrame
val dataRDD: RDD[Row] = sc.parallelize(Array(
  Row(1.0,2.1,3.3),
  Row(3.4,5.9,8.9),
  Row(3.1,2.3,4.1)))
val struct: StructType = StructType(
  StructField("y", DoubleType, false) ::
  StructField("x1", DoubleType, false) ::
  StructField("x2", DoubleType, false) :: Nil)
val df: DataFrame = sqlContext.createDataFrame(dataRDD, struct)

// Make LabeledPoint object from Row objects
def makeLP(row: Row, colnames: Array[String]) =
  LabeledPoint(row.getDouble(0), 
    Vectors.dense((1 until row.length).toArray map (i => row.getDouble(i))))

// Make RDD[LabeledPoint] from DataFrame
def prepRDD_buggy(df: DataFrame): RDD[LabeledPoint] = {
  df map (row => makeLP(row, df.columns))
}
val mat_buggy = prepRDD_buggy(df) 
mat_buggy.collect // throws NullPointerException !

// Make RDD[LabeledPoint] from DataFrame
def prepRDD(df: DataFrame): RDD[LabeledPoint] = {
  val cnames = df.columns
  df map (row => makeLP(row, cnames))
}
val mat = prepRDD(df) 
mat.collect // Works fine

这是我在 spark-shell 中运行 mat_buggy.collect 时看到的(非常详细的)错误消息的前几行.

Here is the first few lines of the (very verbose) error message that I see on running mat_buggy.collect inside my spark-shell.

15/12/24 18:09:28 INFO SparkContext: Starting job: collect at <console>:42
15/12/24 18:09:28 INFO DAGScheduler: Got job 0 (collect at <console>:42) with 2 output partitions
15/12/24 18:09:28 INFO DAGScheduler: Final stage: ResultStage 0(collect at <console>:42)
15/12/24 18:09:28 INFO DAGScheduler: Parents of final stage: List()
15/12/24 18:09:28 INFO DAGScheduler: Missing parents: List()
15/12/24 18:09:28 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at map at <console>:38), which has no missing parents
15/12/24 18:09:28 INFO MemoryStore: ensureFreeSpace(11600) called with curMem=0, maxMem=560993402
15/12/24 18:09:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 11.3 KB, free 535.0 MB)
15/12/24 18:09:28 INFO MemoryStore: ensureFreeSpace(4540) called with curMem=11600, maxMem=560993402
15/12/24 18:09:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.4 KB, free 535.0 MB)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.10.98:53386 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
15/12/24 18:09:28 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at map at <console>:38)
15/12/24 18:09:28 INFO YarnScheduler: Adding task set 0.0 with 2 tasks
15/12/24 18:09:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, ip-10-10-10-217.ec2.internal, PROCESS_LOCAL, 2385 bytes)
15/12/24 18:09:28 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, ip-10-10-10-213.ec2.internal, PROCESS_LOCAL, 2385 bytes)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-10-10-213.ec2.internal:56642 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-10-10-217.ec2.internal:56396 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:29 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-10-10-217.ec2.internal): java.lang.NullPointerException
    at org.apache.spark.sql.DataFrame.schema(DataFrame.scala:290)
    at org.apache.spark.sql.DataFrame.columns(DataFrame.scala:306)
    at $line34.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$prepRDD_buggy$1.apply(<console>:38)
    at $line34.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$prepRDD_buggy$1.apply(<console>:38)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)

推荐答案

你的假设是正确的.columns 需要访问 schema 并且 schema 依赖于 queryExecution,它是暂时的,因此不会发送给工作人员.所以你在 prepRDD 中所做的或多或少是正确的,尽管可以直接从行中提取相同的信息:

Your hypothesis is correct. columns requires an access to schema and schema depends on queryExecution which is transient hence won't be shipped to the workers. So what you're doing in prepRDD is more or less correct although the same information can be extracted directly from rows:

scala> df.rdd.map(_.schema.fieldNames).first
res14: Array[String] = Array(y, x1, x2, x3)

附注 VectorAssembler 加上简单的 map 在这里会是更好的选择.

On a side note VectorAssembler plus simple map would be a better choice here.

这篇关于Spark DataFrame 列名称未传递给从属节点?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆