Spark数据框:选择不同的行 [英] Spark Dataframe: Select distinct rows
问题描述
我尝试了两种方法从镶木地板中查找不同的行,但似乎不起作用.
尝试1: Dataset< Row>df = sqlContext.read().parquet("location.parquet").distinct();
但是抛出
I tried two ways to find distinct rows from parquet but it doesn't seem to work.
Attemp 1:
Dataset<Row> df = sqlContext.read().parquet("location.parquet").distinct();
But throws
Cannot have map type columns in DataFrame which calls set operations
(intersect, except, etc.),
but the type of column canvasHashes is map<string,string>;;
尝试2:尝试运行SQL查询:
Attemp 2: Tried running sql queries:
Dataset<Row> df = sqlContext.read().parquet("location.parquet");
rawLandingDS.createOrReplaceTempView("df");
Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");
我得到的错误:
= SQL ==
SELECT distinct on timestamp * from df
-----------------------------^^^
在读取镶木地板文件时是否有获取不同记录的方法?我可以使用的任何读取选项.
Is there a way to get distinct records while reading parquet files? Any read option I can use.
推荐答案
您遇到的问题已在异常消息中明确说明-因为 MapType
列既不可散列也不可排序,不能用作一部分对表达式进行分组或分区.
The problem you face is explicitly stated in the exception message - because MapType
columns are neither hashable nor orderable cannot be used as a part of grouping or partitioning expression.
您对SQL解决方案的看法在逻辑上不等同于 Dataset
上的 distinct
.如果要基于一组兼容的列对数据进行重复数据删除,则应使用 dropDuplicates
:
Your take on SQL solution is not logically equivalent to distinct
on Dataset
. If you want to deduplicate data based on a set of compatible columns you should use dropDuplicates
:
df.dropDuplicates("timestamp")
等同于
SELECT timestamp, first(c1) AS c1, first(c2) AS c2, ..., first(cn) AS cn,
first(canvasHashes) AS canvasHashes
FROM df GROUP BY timestamp
不幸的是,如果您的目标是实际的 DISTINCT
,那么就不会那么容易了.一种可能的解决方案是利用Scala * Map
哈希.您可以这样定义 Scala udf
:
Unfortunately if your goal is actual DISTINCT
it won't be so easy. On possible solution is to leverage Scala* Map
hashing. You could define Scala udf
like this:
spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)
,然后在Java代码中使用它来派生可用于 dropDuplicates
的列:
and then use it in your Java code to derive column that can be used to dropDuplicates
:
df
.selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
.dropDuplicates(
// All columns excluding canvasHashes / hash_of_canvas_hashes
"timestamp", "c1", "c2", ..., "cn"
// Hash used as surrogate of canvasHashes
"hash_of_canvas_hashes"
)
与SQL等效
SELECT
timestamp, c1, c2, ..., cn, -- All columns excluding canvasHashes
first(canvasHashes) AS canvasHashes
FROM df GROUP BY
timestamp, c1, c2, ..., cn -- All columns excluding canvasHashes
*请注意,带有 hashCode
的 java.util.Map
无效,因为 hashCode
不一致.
这篇关于Spark数据框:选择不同的行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!