在 Spark 中访问数组列 [英] Access Array column in Spark

查看:35
本文介绍了在 Spark 中访问数组列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spark DataFrame 包含一个 Array[Double] 类型的列.当我尝试在 map() 函数中取回它时,它会抛出 ClassCastException 异常.以下 Scala 代码生成异常.

A Spark DataFrame contains a column of type Array[Double]. It throw a ClassCastException exception when I try to get it back in a map() function. The following Scala code generate an exception.

case class Dummy( x:Array[Double] )
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3))))
val s = df.map( r => {
   val arr:Array[Double] = r.getAs[Array[Double]]("x")
   arr.sum
})
s.foreach(println)

例外是

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:24)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:23)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

有人向我解释为什么它不起作用?我应该怎么做?我使用的是 Spark 1.5.1 和 Scala 2.10.6

Cam somebody explain me why it does not work? what should I do instead? I am using Spark 1.5.1 and scala 2.10.6

谢谢

推荐答案

ArrayTypeRow 中表示为 scala.collection.mutable.WrappedArray.您可以使用例如

ArrayType is represented in a Row as a scala.collection.mutable.WrappedArray. You can extract it using for example

val arr: Seq[Double] = r.getAs[Seq[Double]]("x")

val i: Int = ???
val arr = r.getSeq[Double](i)

甚至:

import scala.collection.mutable.WrappedArray

val arr: WrappedArray[Double] = r.getAs[WrappedArray[Double]]("x")

如果 DataFrame 相对较薄,那么模式匹配可能是更好的方法:

If DataFrame is relatively thin then pattern matching could be a better approach:

import org.apache.spark.sql.Row

df.rdd.map{case Row(x: Seq[Double]) => (x.toArray, x.sum)}

尽管您必须记住,序列的类型是未选中的.

although you have to keep in mind that the type of the sequence is unchecked.

在 Spark >= 1.6 中,您还可以使用 Dataset 如下:

In Spark >= 1.6 you can also use Dataset as follows:

df.select("x").as[Seq[Double]].rdd

这篇关于在 Spark 中访问数组列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆