SELECT DISTINCT Spark中的Cassandra [英] SELECT DISTINCT Cassandra in Spark

查看:48
本文介绍了SELECT DISTINCT Spark中的Cassandra的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要一个查询,该查询列出spark内唯一的复合分区键

CASSANDRA中的查询: SELECT DISTINCT key1,key2 ,key3 FROM schema.table; 相当快,但是将相同类型的数据过滤器放入RDD或spark.sql中,相比之下,检索结果的速度却非常慢。

I need a query that lists out the the unique Composite Partition Keys inside of spark.
The query in CASSANDRA: SELECT DISTINCT key1, key2, key3 FROM schema.table; is quite fast, however putting the same sort of data filter in a RDD or spark.sql retrieves results incredibly slowly in comparison.

例如

---- SPARK ----
var t1 = sc.cassandraTable("schema","table").select("key1", "key2", "key3").distinct()
var t2 = spark.sql("SELECT DISTINCT key1, key2, key3 FROM schema.table")

t1.count // takes 20 minutes
t2.count // takes 20 minutes

---- CASSANDRA ----
// takes < 1 minute while also printing out all results
SELECT DISTINCT key1, key2, key3 FROM schema.table; 

其中表格式如下:

CREATE TABLE schema.table (
    key1 text,
    key2 text,
    key3 text,
    ckey1 text,
    ckey2 text,
    v1 int,
    PRIMARY KEY ((key1, key2, key3), ckey1, ckey2)
);

不火花在其查询中使用卡桑德拉优化吗?

如何我可以有效地检索这些信息吗?

Doesn't spark use cassandra optimisations in its' queries?
How can I retreive this information efficiently?

推荐答案

快速解答



不会在查询中使用cassandra优化吗?

Doesn't spark use cassandra optimisations in its' queries?

是。但是,使用SparkSQL时,只能进行列修剪和谓词下推。在RDD中,它是手动的。

Yes. But with SparkSQL only column pruning and predicate pushdowns. In RDDs it is manual.


如何有效检索此信息?

How can I retreive this information efficiently?

由于您的请求返回的速度足够快,所以我将直接使用Java驱动程序来获取此结果集。

Since your request returns quickly enough, I would just use the Java Driver directly to get this result set.

虽然Spark SQL可以提供一些基于C *的优化,但是在使用DataFrame接口时,这些优化通常仅限于谓词下推。这是因为该框架仅向数据源提供了有限的信息。我们可以通过对所编写的查询进行解释来看到这一点。

While Spark SQL can provide some C* based optimizations these are usually limited to predicate pushdowns when using the DataFrame interface. This is because the framework only provides limited information to the datasource. We can see this by doing an explain on the query you have written.

scala> spark.sql("SELECT DISTINCT key1, key2, key3 FROM test.tab").explain
== Physical Plan ==
*HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
+- Exchange hashpartitioning(key1#30, key2#31, key3#32, 200)
   +- *HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
      +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation test.tab[key1#30,key2#31,key3#32] ReadSchema: struct<key1:string,key2:string,key3:string>

因此,您的Spark示例实际上将分为几个步骤。

So your Spark example will actually be broken into several steps.


  1. Scan:读取该表中的所有数据。这意味着将每个值从C 机器序列化到Spark Executor JVM,换句话说,需要做很多工作。

  2. * HashAggregate / Exchange / Hash Aggregate:从以下值中获取值每个执行器都在本地对它们进行哈希处理,然后在计算机之间交换数据,并再次哈希以确保唯一性。用外行的话来说,这意味着创建大型散列结构,对其进行序列化,运行复杂的分布式排序合并,然后再次运行
    散列。 (昂贵)

  1. Scan : Read all the data from this table. This is means serializing every value from the C machine to the Spark Executor JVM, in other words lots of work.
  2. *HashAggregate/Exchange/Hash Aggregate: Take the values from each executor, hash them locally then exchange the data between machines and hash again to ensure uniqueness. In layman's terms this means creating large hash structures, serializing them, running a complicated distributed sortmerge, then running a hash again. (Expensive)

为什么其中的任何一个都没有下推到C *?这是因为数据源(在这种情况下为CassandraSourceRelation)未获得有关查询的 Distinct 部分的信息。这只是Spark当前工作方式的一部分。 关于什么是可推送的文档

Why doesn't any of this get pushed down to C*? This is because Datasource (The CassandraSourceRelation in this case) is not given the information about the Distinct part of the query. This is just part of how Spark currently works. Docs on what is pushable

使用RDDS,我们可以直接向Spark提供一系列指令。这意味着,如果您想下压某物,则必须手动指定。让我们看看RDD请求的调试输出

With RDDS we give a direct set of instructions to Spark. This means if you want to push something down it must be manually specified. Let's see the debug output of the RDD request

scala> sc.cassandraTable("test","tab").distinct.toDebugString
res2: String =
(13) MapPartitionsRDD[7] at distinct at <console>:45 []
 |   ShuffledRDD[6] at distinct at <console>:45 []
 +-(13) MapPartitionsRDD[5] at distinct at <console>:45 []
    |   CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:19 []

这里的问题是您的与众不同调用是 RDD ,并非特定于Cassandra。由于RDD要求所有优化都是显式的(您键入的是所得到的),因此Cassandra从未听说过对 Distinct的需求。我们得到的计划几乎与我们的Spark SQL版本相同。进行全面扫描,将所有数据从Cassandra序列化到Spark。进行洗牌,然后返回结果。

Here the issue is that your "distinct" call is a generic operation on an RDD and not specific to Cassandra. Since RDDs require all optimizations to be explicit (what you type is what you get) Cassandra never hears about this need for "Distinct" and we get a plan that is almost identical to our Spark SQL version. Do a full scan, serialize all of the data from Cassandra to Spark. Do a Shuffle and then return the results.

使用SparkSQL,这和我们得到的一样好。无需在 Catalyst (SparkSQL / Dataframes Optimizer),让它知道Cassandra可以在服务器级别处理一些 distinct 调用。然后需要为CassandraRDD子类实现它。

With SparkSQL this is about as good as we can get without adding new rules to Catalyst (the SparkSQL/Dataframes Optimizer) to let it know that Cassandra can handle some distinct calls at the server level. It would then need to be implemented for the CassandraRDD subclasses.

对于RDD,我们需要添加一个函数,例如已经存在的 where select limit 调用Cassandra RDD。可以添加一个新的 Distinct 调用此处,尽管仅在特定情况下才允许。该功能目前在SCC中尚不存在,但可以相对容易地添加,因为它所要做的只是将 DISTINCT 附加到请求,并可能添加一些检查以确保它是 DISTINCT 有意义的。

For RDDs we would need to add a function like the already existing where, select, and limit, calls to the Cassandra RDD. A new Distinct call could be added here although it would only be allowable in specific situations. This is a function that currently does not exist in the SCC but could be added relatively easily since all it would do is prepend DISTINCT to requests and probably add some checking to make sure it is a DISTINCT that makes sense.

因为我们知道要发出的确切CQL请求,所以我们始终可以直接使用Cassandra驱动程序来获取此信息。 Spark Cassandra连接器提供了一个我们可以使用的驱动程序池,也可以只在本地使用Java驱动程序。要使用该池,我们将执行以下操作

Since we know the exact CQL request that we would like to make we can always use the Cassandra driver directly to get this information. The Spark Cassandra connector provides a driver pool we can use or we could just use the Java Driver natively. To use the pool we would do something like

import com.datastax.spark.connector.cql.CassandraConnector
CassandraConnector(sc.getConf).withSessionDo{ session => 
  session.execute("SELECT DISTINCT key1, key2, key3 FROM test.tab;").all()
}

然后,如果需要进一步的Spark工作,则将结果并行化。如果我们真的想分发它,则很有必要像我上面所述将功能添加到Spark Cassandra Connector中。

And then parallelize the results if they are needed for further Spark work. If we really wanted to distribute this it would be necessary to most likely add the function to the Spark Cassandra Connector as I described above.

这篇关于SELECT DISTINCT Spark中的Cassandra的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆