从 spark DataFrame 中提取 `Seq[(String,String,String)]` [英] Extracting `Seq[(String,String,String)]` from spark DataFrame

查看:91
本文介绍了从 spark DataFrame 中提取 `Seq[(String,String,String)]`的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有 Seq[(String, String, String)] 行的 spark DF.我正在尝试用它做某种 flatMap ,但我尝试做的任何事情最终都会抛出

<块引用>

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 不能转换为 scala.Tuple3

我可以从 DF 中取出单行或多行

df.map{ r =>r.getSeq[Feature](1)}.first

返回

Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] .....

并且 RDD 的数据类型似乎是正确的.

org.apache.spark.rdd.RDD[Seq[(String, String, String)]]

df 的架构是

root|-- article_id: long (nullable = true)|-- content_processed: 数组 (nullable = true)||-- 元素: struct (containsNull = true)|||-- 引理:字符串(可为空 = 真)|||-- pos_tag: 字符串 (nullable = true)|||-- ne_tag: 字符串(可为空 = 真)

我知道这个问题与 spark sql 将 RDD 行视为 org.apache.spark.sql.Row 相关,即使他们愚蠢地说它是一个 Seq[(String, String, 字符串)].有一个相关的问题(下面的链接),但该问题的答案对我不起作用.我对 spark 也不够熟悉,无法弄清楚如何将其转变为可行的解决方案.

是行 Row[Seq[(String, String, String)]] 还是 Row[(String, String, String)]Seq[Row[(String, String, String)]]] 或者更疯狂的东西.

我正在尝试做类似的事情

df.map{ r =>r.getSeq[Feature](1)}.map(_(1)._1)

看似有效但实际上无效

df.map{ r =>r.getSeq[Feature](1)}.map(_(1)._1).first

抛出上述错误.那么我应该如何(例如)获取每行第二个元组的第一个元素?

此外,为什么spark 被设计来做到这一点,声称某物属于一种类型似乎是愚蠢的,而实际上它不是并且无法转换为声称的类型.><小时>

相关问题:GenericRowWithSchema 将 ArrayBuffer 转换为异常DataFrame中的HashSet从Hive表到RDD

相关bug报告:http://search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type

解决方案

好吧,它并没有声称它是一个元组.它声称它是一个映射到 Rowstruct:

import org.apache.spark.sql.Row案例类特征(引理:字符串,pos_tag:字符串,ne_tag:字符串)case class Record(id: Long, content_processed: Seq[Feature])val df = 序列(记录(1L,序列(特征(古代",jj",o"),特征(olympia_greece",nn",位置")))).toDFval content = df.select($"content_processed").rdd.map(_.getSeq[Row](0))

您将在 Spark 中找到确切的映射规则SQL 编程指南.

由于 Row 不是很漂亮的结构,您可能希望将其映射到有用的内容:

content.map(_.map {case Row(lemma: String, pos_tag: String, ne_tag: String) =>(引理,pos_tag,ne_tag)})

或:

content.map(_.map ( row => (row.getAs[String]("引理"),row.getAs[String]("pos_tag"),row.getAs[String]("ne_tag"))))

最后一个更简洁的方法是Datasets:

df.as[Record].rdd.map(_.content_processed)

df.select($"content_processed").as[Seq[(String, String, String)]]

虽然目前这似乎有点问题.

第一种方法 (Row.getAs) 和第二种方法 (Dataset.as) 存在重要区别.前者将对象提取为 Any 并应用 asInstanceOf.后者是使用编码器在内部类型和所需表示之间进行转换.

I have a spark DF with rows of Seq[(String, String, String)]. I'm trying to do some kind of a flatMap with that but anything I do try ends up throwing

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple3

I can take a single row or multiple rows from the DF just fine

df.map{ r => r.getSeq[Feature](1)}.first

returns

Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] .....

and the data type of the RDD seems correct.

org.apache.spark.rdd.RDD[Seq[(String, String, String)]]

The schema of the df is

root
 |-- article_id: long (nullable = true)
 |-- content_processed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- lemma: string (nullable = true)
 |    |    |-- pos_tag: string (nullable = true)
 |    |    |-- ne_tag: string (nullable = true)

I know this problem is related to spark sql treating the RDD rows as org.apache.spark.sql.Row even though they idiotically say that it's a Seq[(String, String, String)]. There's a related question (link below) but the answer to that question doesn't work for me. I am also not familiar enough with spark to figure out how to turn it into a working solution.

Are the rows Row[Seq[(String, String, String)]] or Row[(String, String, String)] or Seq[Row[(String, String, String)]] or something even crazier.

I'm trying to do something like

df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1)

which appears to work but doesn't actually

df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1).first

throws the above error. So how am I supposed to (for instance) get the first element of the second tuple on each row?

Also WHY has spark been designed to do this, it seems idiotic to claim that something is of one type when in fact it isn't and can not be converted to the claimed type.


Related question: GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame to RDD from Hive table

Related bug report: http://search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type

解决方案

Well, it doesn't claim that it is a tuple. It claims it is a struct which maps to Row:

import org.apache.spark.sql.Row

case class Feature(lemma: String, pos_tag: String, ne_tag: String)
case class Record(id: Long, content_processed: Seq[Feature])

val df = Seq(
  Record(1L, Seq(
    Feature("ancient", "jj", "o"),
    Feature("olympia_greece", "nn", "location")
  ))
).toDF

val content = df.select($"content_processed").rdd.map(_.getSeq[Row](0))

You'll find exact mapping rules in the Spark SQL programming guide.

Since Row is not exactly pretty structure you'll probably want to map it to something useful:

content.map(_.map {
  case Row(lemma: String, pos_tag: String, ne_tag: String) => 
    (lemma, pos_tag, ne_tag)
})

or:

content.map(_.map ( row => (
  row.getAs[String]("lemma"),
  row.getAs[String]("pos_tag"),
  row.getAs[String]("ne_tag")
)))

Finally a slightly more concise approach with Datasets:

df.as[Record].rdd.map(_.content_processed)

or

df.select($"content_processed").as[Seq[(String, String, String)]]

although this seems to be slightly buggy at this moment.

There is important difference the first approach (Row.getAs) and the second one (Dataset.as). The former one extract objects as Any and applies asInstanceOf. The latter one is using encoders to transform between internal types and desired representation.

这篇关于从 spark DataFrame 中提取 `Seq[(String,String,String)]`的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆