Spark Dataframe:选择不同的行 [英] Spark Dataframe: Select distinct rows

查看:57
本文介绍了Spark Dataframe:选择不同的行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试了两种方法从镶木地板中找到不同的行,但似乎不起作用.
尝试 1:<代码>数据集<行>df = sqlContext.read().parquet("location.parquet").distinct();
但是扔了

I tried two ways to find distinct rows from parquet but it doesn't seem to work.
Attemp 1: Dataset<Row> df = sqlContext.read().parquet("location.parquet").distinct();
But throws

Cannot have map type columns in DataFrame which calls set operations
(intersect, except, etc.), 
but the type of column canvasHashes is map<string,string>;;

尝试 2:尝试运行 sql 查询:

Attemp 2: Tried running sql queries:

Dataset<Row> df = sqlContext.read().parquet("location.parquet");
    rawLandingDS.createOrReplaceTempView("df");
    Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");

我得到的错误:

= SQL ==
SELECT distinct on timestamp * from df
-----------------------------^^^

有没有办法在读取镶木地板文件时获得不同的记录?我可以使用的任何阅读选项.

Is there a way to get distinct records while reading parquet files? Any read option I can use.

推荐答案

您面临的问题已在异常消息中明确说明 - 因为 MapType 列既不是 hashable 也不是 orderable 不能用作一部分分组或分区表达式.

The problem you face is explicitly stated in the exception message - because MapType columns are neither hashable nor orderable cannot be used as a part of grouping or partitioning expression.

您对 SQL 解决方案的看法在逻辑上不等同于 Dataset 上的 distinct.如果您想根据一组兼容的列删除重复数据,您应该使用 dropDuplicates:

Your take on SQL solution is not logically equivalent to distinct on Dataset. If you want to deduplicate data based on a set of compatible columns you should use dropDuplicates:

df.dropDuplicates("timestamp")

相当于

SELECT timestamp, first(c1) AS c1, first(c2) AS c2,  ..., first(cn) AS cn,
       first(canvasHashes) AS canvasHashes
FROM df GROUP BY timestamp

不幸的是,如果您的目标是实际的DISTINCT,那就没那么容易了.可能的解决方案是利用 Scala* Map 散列.你可以像这样定义 Scala udf :

Unfortunately if your goal is actual DISTINCT it won't be so easy. On possible solution is to leverage Scala* Map hashing. You could define Scala udf like this:

spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)

然后在您的 Java 代码中使用它来派生可用于 dropDuplicates 的列:

and then use it in your Java code to derive column that can be used to dropDuplicates:

 df
  .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
  .dropDuplicates(
    // All columns excluding canvasHashes / hash_of_canvas_hashes
    "timestamp",  "c1", "c2", ..., "cn" 
    // Hash used as surrogate of canvasHashes
    "hash_of_canvas_hashes"         
  )

与 SQL 等效

SELECT 
  timestamp, c1, c2, ..., cn,   -- All columns excluding canvasHashes
  first(canvasHashes) AS canvasHashes
FROM df GROUP BY
  timestamp, c1, c2, ..., cn    -- All columns excluding canvasHashes

<小时>

* 请注意 java.util.Map 及其 hashCode 将不起作用,因为 hashCode 不一致.


* Please note that java.util.Map with its hashCode won't work, as hashCode is not consistent.

这篇关于Spark Dataframe:选择不同的行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆