使用行字段(字符串数组)过滤火花数据帧 [英] filter spark dataframe with row field that is an array of strings
问题描述
使用Spark 1.5和Scala 2.10.6
Using Spark 1.5 and Scala 2.10.6
我正在尝试通过字段"tags"(它是字符串数组)来过滤数据帧.寻找标记为"private"的所有行.
I'm trying to filter a dataframe via a field "tags" that is an array of strings. Looking for all rows that have the tag 'private'.
val report = df.select("*")
.where(df("tags").contains("private"))
获取:
线程主要" org.apache.spark.sql.AnalysisException中的异常: 由于数据类型不匹配而无法解析包含(标签,私有)": 参数1需要字符串类型,但是'tags'是数组 类型.
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'Contains(tags, private)' due to data type mismatch: argument 1 requires string type, however, 'tags' is of array type.;
是否更适合使用过滤器方法?
Is the filter method better suited?
已更新:
数据来自cassandra适配器,但是显示我正在尝试做的并得到上述错误的一个最小示例是:
the data is coming from cassandra adapter but a minimal example that shows what I'm trying to do and also gets the above error is:
def testData (sc: SparkContext): DataFrame = {
val stringRDD = sc.parallelize(Seq("""
{ "name": "ed",
"tags": ["red", "private"]
}""",
"""{ "name": "fred",
"tags": ["public", "blue"]
}""")
)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
sqlContext.read.json(stringRDD)
}
def run(sc: SparkContext) {
val df1 = testData(sc)
df1.show()
val report = df1.select("*")
.where(df1("tags").contains("private"))
report.show()
}
已更新:标签数组可以是任意长度,私有"标签可以位于任何位置
UPDATED: the tags array can be any length and the 'private' tag can be in any position
更新:一种有效的解决方案:UDF
UPDATED: one solution that works: UDF
val filterPriv = udf {(tags: mutable.WrappedArray[String]) => tags.contains("private")}
val report = df1.filter(filterPriv(df1("tags")))
推荐答案
我认为,如果您使用where(array_contains(...))
,它将起作用.这是我的结果:
I think if you use where(array_contains(...))
it will work. Here's my result:
scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
scala> import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame
scala> def testData (sc: SparkContext): DataFrame = {
| val stringRDD = sc.parallelize(Seq
| ("""{ "name": "ned", "tags": ["blue", "big", "private"] }""",
| """{ "name": "albert", "tags": ["private", "lumpy"] }""",
| """{ "name": "zed", "tags": ["big", "private", "square"] }""",
| """{ "name": "jed", "tags": ["green", "small", "round"] }""",
| """{ "name": "ed", "tags": ["red", "private"] }""",
| """{ "name": "fred", "tags": ["public", "blue"] }"""))
| val sqlContext = new org.apache.spark.sql.SQLContext(sc)
| import sqlContext.implicits._
| sqlContext.read.json(stringRDD)
| }
testData: (sc: org.apache.spark.SparkContext)org.apache.spark.sql.DataFrame
scala>
| val df = testData (sc)
df: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]
scala> val report = df.select ("*").where (array_contains (df("tags"), "private"))
report: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]
scala> report.show
+------+--------------------+
| name| tags|
+------+--------------------+
| ned|[blue, big, private]|
|albert| [private, lumpy]|
| zed|[big, private, sq...|
| ed| [red, private]|
+------+--------------------+
请注意,如果您编写where(array_contains(df("tags"), "private"))
,它会起作用,但是,如果您编写where(df("tags").array_contains("private"))
(与您最初编写的内容更直接类似),它会失败,并显示array_contains is not a member of org.apache.spark.sql.Column
.查看Column
的源代码,我发现有一些东西可以处理contains
(为此构造一个Contains
实例),但不能处理array_contains
.也许这是一个疏忽.
Note that it works if you write where(array_contains(df("tags"), "private"))
, but if you write where(df("tags").array_contains("private"))
(more directly analogous to what you wrote originally) it fails with array_contains is not a member of org.apache.spark.sql.Column
. Looking at the source code for Column
, I see there's some stuff to handle contains
(constructing a Contains
instance for that) but not array_contains
. Maybe that's an oversight.
这篇关于使用行字段(字符串数组)过滤火花数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!