从 spark DataFrame 中提取 `Seq[(String,String,String)]` [英] Extracting `Seq[(String,String,String)]` from spark DataFrame
问题描述
我有一个带有 Seq[(String, String, String)]
行的 spark DF.我正在尝试用它做某种 flatMap
,但我尝试做的任何事情最终都会抛出
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 不能转换为 scala.Tuple3
我可以从 DF 中取出单行或多行
df.map{ r =>r.getSeq[Feature](1)}.first
返回
Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] .....
并且 RDD 的数据类型似乎是正确的.
org.apache.spark.rdd.RDD[Seq[(String, String, String)]]
df 的架构是
root|-- article_id: long (nullable = true)|-- content_processed: 数组 (nullable = true)||-- 元素: struct (containsNull = true)|||-- 引理:字符串(可为空 = 真)|||-- pos_tag: 字符串 (nullable = true)|||-- ne_tag: 字符串(可为空 = 真)
我知道这个问题与 spark sql 将 RDD 行视为 org.apache.spark.sql.Row
相关,即使他们愚蠢地说它是一个 Seq[(String, String, 字符串)]
.有一个相关的问题(下面的链接),但该问题的答案对我不起作用.我对 spark 也不够熟悉,无法弄清楚如何将其转变为可行的解决方案.
是行 Row[Seq[(String, String, String)]]
还是 Row[(String, String, String)]
或 Seq[Row[(String, String, String)]]]
或者更疯狂的东西.
我正在尝试做类似的事情
df.map{ r =>r.getSeq[Feature](1)}.map(_(1)._1)
看似有效但实际上无效
df.map{ r =>r.getSeq[Feature](1)}.map(_(1)._1).first
抛出上述错误.那么我应该如何(例如)获取每行第二个元组的第一个元素?
此外,为什么spark 被设计来做到这一点,声称某物属于一种类型似乎是愚蠢的,而实际上它不是并且无法转换为声称的类型.><小时>
相关问题:GenericRowWithSchema 将 ArrayBuffer 转换为异常DataFrame中的HashSet从Hive表到RDD
好吧,它并没有声称它是一个元组.它声称它是一个映射到 Row
的 struct
:
import org.apache.spark.sql.Row案例类特征(引理:字符串,pos_tag:字符串,ne_tag:字符串)case class Record(id: Long, content_processed: Seq[Feature])val df = 序列(记录(1L,序列(特征(古代",jj",o"),特征(olympia_greece",nn",位置")))).toDFval content = df.select($"content_processed").rdd.map(_.getSeq[Row](0))
由于 Row
不是很漂亮的结构,您可能希望将其映射到有用的内容:
content.map(_.map {case Row(lemma: String, pos_tag: String, ne_tag: String) =>(引理,pos_tag,ne_tag)})
或:
content.map(_.map ( row => (row.getAs[String]("引理"),row.getAs[String]("pos_tag"),row.getAs[String]("ne_tag"))))
最后一个更简洁的方法是Datasets
:
df.as[Record].rdd.map(_.content_processed)
或
df.select($"content_processed").as[Seq[(String, String, String)]]
虽然目前这似乎有点问题.
第一种方法 (Row.getAs
) 和第二种方法 (Dataset.as
) 存在重要区别.前者将对象提取为 Any
并应用 asInstanceOf
.后者是使用编码器在内部类型和所需表示之间进行转换.
I have a spark DF with rows of Seq[(String, String, String)]
. I'm trying to do some kind of a flatMap
with that but anything I do try ends up throwing
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple3
I can take a single row or multiple rows from the DF just fine
df.map{ r => r.getSeq[Feature](1)}.first
returns
Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] .....
and the data type of the RDD seems correct.
org.apache.spark.rdd.RDD[Seq[(String, String, String)]]
The schema of the df is
root
|-- article_id: long (nullable = true)
|-- content_processed: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- lemma: string (nullable = true)
| | |-- pos_tag: string (nullable = true)
| | |-- ne_tag: string (nullable = true)
I know this problem is related to spark sql treating the RDD rows as org.apache.spark.sql.Row
even though they idiotically say that it's a Seq[(String, String, String)]
. There's a related question (link below) but the answer to that question doesn't work for me. I am also not familiar enough with spark to figure out how to turn it into a working solution.
Are the rows Row[Seq[(String, String, String)]]
or Row[(String, String, String)]
or Seq[Row[(String, String, String)]]
or something even crazier.
I'm trying to do something like
df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1)
which appears to work but doesn't actually
df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1).first
throws the above error. So how am I supposed to (for instance) get the first element of the second tuple on each row?
Also WHY has spark been designed to do this, it seems idiotic to claim that something is of one type when in fact it isn't and can not be converted to the claimed type.
Related question: GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame to RDD from Hive table
Related bug report: http://search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type
Well, it doesn't claim that it is a tuple. It claims it is a struct
which maps to Row
:
import org.apache.spark.sql.Row
case class Feature(lemma: String, pos_tag: String, ne_tag: String)
case class Record(id: Long, content_processed: Seq[Feature])
val df = Seq(
Record(1L, Seq(
Feature("ancient", "jj", "o"),
Feature("olympia_greece", "nn", "location")
))
).toDF
val content = df.select($"content_processed").rdd.map(_.getSeq[Row](0))
You'll find exact mapping rules in the Spark SQL programming guide.
Since Row
is not exactly pretty structure you'll probably want to map it to something useful:
content.map(_.map {
case Row(lemma: String, pos_tag: String, ne_tag: String) =>
(lemma, pos_tag, ne_tag)
})
or:
content.map(_.map ( row => (
row.getAs[String]("lemma"),
row.getAs[String]("pos_tag"),
row.getAs[String]("ne_tag")
)))
Finally a slightly more concise approach with Datasets
:
df.as[Record].rdd.map(_.content_processed)
or
df.select($"content_processed").as[Seq[(String, String, String)]]
although this seems to be slightly buggy at this moment.
There is important difference the first approach (Row.getAs
) and the second one (Dataset.as
). The former one extract objects as Any
and applies asInstanceOf
. The latter one is using encoders to transform between internal types and desired representation.
这篇关于从 spark DataFrame 中提取 `Seq[(String,String,String)]`的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!