从 Cassandra 将宽行分布式加载到 Spark [英] Distributed loading of a wide row into Spark from Cassandra

查看:16
本文介绍了从 Cassandra 将宽行分布式加载到 Spark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我们有一个 RF = N 的 Cassandra 集群和一个包含宽行的表.

Let's assume we have a Cassandra cluster with RF = N and a table containing wide rows.

我们的表可以有这样的索引:pk/ck1/ck2/....

Our table could have an index something like this: pk / ck1 / ck2 / ....

如果我们从表中的一行创建一个 RDD 如下:

If we create an RDD from a row in the table as follows:

val wide_row = sc.cassandraTable(KS, TABLE).select("c1", "c2").where("pk = ?", PK)

我注意到一个 Spark 节点拥有 100% 的数据,而其他节点则没有.我认为这是因为 spark-cassandra-connector 无法将查询标记范围分解为更小的子范围,因为它实际上不是一个范围——它只是 PK 的散列.

I notice that one Spark node has 100% of the data and the others have none. I assume this is because the spark-cassandra-connector has no way of breaking down the query token range into smaller sub ranges because it's actually not a range -- it's simply the hash of PK.

此时我们可以简单地调用 redistribute(N) 来在处理之前将数据分布到 Spark 集群中,但这具有将数据通过网络移动到已经有数据的节点的效果在 Cassandra 本地(记住 RF = N)

At this point we could simply call redistribute(N) to spread the data across the Spark cluster before processing, but this has the effect of moving data across the network to nodes that already have the data locally in Cassandra (remember RF = N)

我们真正想要的是让每个 Spark 节点从 Cassandra 本地加载行的子集(切片).

想到的一种方法是当 pk = PK 时生成一个包含第一个集群键 (ck1) 的不同值列表的 RDD.然后我们可以使用 mapPartitions() 根据 ck1 的每个值加载宽行的切片.

One approach which came to mind is to generate an RDD containing a list of distinct values of the first cluster key (ck1) when pk = PK. We could then use mapPartitions() to load a slice of the wide row based on each value of ck1.

假设我们已经有了 ck1 的列表值,我们可以这样写:

Assuming we already have our list values for ck1, we could write something like this:

val ck1_list = ....  // RDD

ck1_list.repartition(ck1_list.count().toInt) // create a partition for each value of ck1

val wide_row = ck1_list.mapPartitions(f) 

在分区迭代器 f() 中,我们想调用另一个函数 g(pk, ck1),它从 Cassandra 加载行切片,用于分区键 pk 和集群键 ck1.然后我们可以将 flatMap 应用到 ck1_list 以创建一个完全分布式的宽行 RDD,没有任何 shuffing.

Within the partition iterator, f(), we would like to call another function g(pk, ck1) which loads the row slice from Cassandra for partition key pk and cluster key ck1. We could then apply flatMap to ck1_list so as to create a fully distributed RDD of the wide row without any shuffing.

问题来了:

是否可以从 Spark 任务中进行 CQL 调用?应该用什么驱动?是否可以只设置一次以供后续任务使用?

Is it possible to make a CQL call from within a Spark task? What driver should be used? Can it be set up only once an reused for subsequent tasks?

任何帮助将不胜感激,谢谢.

Any help would be greatly appreciated, thanks.

推荐答案

为了将来参考,我将解释我是如何解决这个问题的.

For the sake of future reference, I will explain how I solved this.

我实际上使用了一种与上面概述的方法略有不同的方法,该方法不涉及从 Spark 任务内部调用 Cassandra.

I actually used a slightly different method to the one outlined above, one which does not involve calling Cassandra from inside Spark tasks.

我从 ck_list 开始,这是当 pk = PK 时第一个集群键的不同值的列表.此处未显示代码,但我实际上是使用 CQL 直接从 Cassandra 中的 Spark 驱动程序中下载了此列表.

I started off with ck_list, a list of distinct values for the first cluster key when pk = PK. The code is not shown here, but I actually downloaded this list directly from Cassandra in the Spark driver using CQL.

然后我将 ck_list 转换为 RDDS 列表.接下来我们将 RDD(每个代表一个 Cassandra 行切片)组合成一个统一的 RDD(wide_row).

I then transform ck_list into a list of RDDS. Next we combine the RDDs (each one representing a Cassandra row slice) into one unified RDD (wide_row).

CassandraRDD 的强制转换是必要的,因为 union 返回类型 org.apache.spark.rdd.RDD

The cast on CassandraRDD is necessary because union returns type org.apache.spark.rdd.RDD

运行作业后,我能够验证 wide_row 有 x 个分区,其中 x 是 ck_list 的大小.一个有用的副作用是 wide_row 被第一个集群键分区,这也是我想要减少的键.因此避免了更多的洗牌.

After running the job I was able to verify that the wide_row had x partitions where x is the size of ck_list. A useful side effect is that wide_row is partitioned by the first cluster key, which is also the key I want to reduce by. Hence even more shuffling is avoided.

我不知道这是否是实现我想要的最佳方式,但它确实有效.

I don't know if this is the best way to achieve what I wanted, but it certainly works.

val ck_list // list first cluster key values where pk = PK

val wide_row = ck_list.map( ck =>
  sc.cassandraTable(KS, TBL)
    .select("c1", "c2").where("pk = ? and ck1 = ?", PK, ck)
    .asInstanceOf[org.apache.spark.rdd.RDD] 
).reduce( (x, y) => x.union(y) )

这篇关于从 Cassandra 将宽行分布式加载到 Spark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆