分布在宽行装载到星火从卡桑德拉 [英] Distributed loading of a wide row into Spark from Cassandra

查看:235
本文介绍了分布在宽行装载到星火从卡桑德拉的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我们有一个RF = N和含有宽行表的卡桑德拉集群。

Let's assume we have a Cassandra cluster with RF = N and a table containing wide rows.

我们的表可以有一个索引是这样的: PK / CK1 / CK2 / ....

Our table could have an index something like this: pk / ck1 / ck2 / ....

如果我们创建从表中的行的RDD如下:

If we create an RDD from a row in the table as follows:

val wide_row = sc.cassandraTable(KS, TABLE).select("c1", "c2").where("pk = ?", PK)

我注意到一个星火节点有数据的100%,其他没有。我想这是因为火花卡桑德拉连接器没有打破查询标记范围分成更小的子范围,因为它实际上不是一个范围的方式 - 它只是PK的散列

I notice that one Spark node has 100% of the data and the others have none. I assume this is because the spark-cassandra-connector has no way of breaking down the query token range into smaller sub ranges because it's actually not a range -- it's simply the hash of PK.

在这一点上,我们可以简单地调用重新分配(N)至S $ P $垫跨越处理前的星火簇中的数据,但这移动数据的影响通过网络节点已经在本地的数据卡珊德拉(记住RF = N)

At this point we could simply call redistribute(N) to spread the data across the Spark cluster before processing, but this has the effect of moving data across the network to nodes that already have the data locally in Cassandra (remember RF = N)

我们真正喜欢的是从卡桑德拉在本地的行的每个星火节点的负载的一个子集(片)。

这浮现在脑海一种方法是生成包含第一簇键的不同值(CK1)在PK = PK列表的RDD。然后,我们可以使用 mapPartitions()加载基于CK1的每个值的宽行中分得一杯羹。

One approach which came to mind is to generate an RDD containing a list of distinct values of the first cluster key (ck1) when pk = PK. We could then use mapPartitions() to load a slice of the wide row based on each value of ck1.

假设我们已经有了我们的CK1列表值,我们可以这样写:

Assuming we already have our list values for ck1, we could write something like this:

val ck1_list = ....  // RDD

ck1_list.repartition(ck1_list.count().toInt) // create a partition for each value of ck1

val wide_row = ck1_list.mapPartitions(f) 

在分区迭代器,F(),我们想调用另一个函数g(PK,CK1),它加载从卡桑德拉排片分区键 PK 和簇键 CK1 。然后,我们可以申请 flatMap ck1_list 这样没有任何shuffing创造了宽行的完全分布式RDD。

Within the partition iterator, f(), we would like to call another function g(pk, ck1) which loads the row slice from Cassandra for partition key pk and cluster key ck1. We could then apply flatMap to ck1_list so as to create a fully distributed RDD of the wide row without any shuffing.

所以这里的问题:

是否有可能从星火任务中做出CQL电话吗?什么驱动程序应该使用?它可以设置一次后续任务的重用?

Is it possible to make a CQL call from within a Spark task? What driver should be used? Can it be set up only once an reused for subsequent tasks?

任何帮助将大大AP preciated,谢谢。

Any help would be greatly appreciated, thanks.

推荐答案

有关备查起见,我将解释我是如何解决这一点。

For the sake of future reference, I will explain how I solved this.

我实际使用稍微不同的方法向所述一个以上所概述的,其中一个不涉及从内部火花任务调用卡桑德拉。

I actually used a slightly different method to the one outlined above, one which does not involve calling Cassandra from inside Spark tasks.

我开始了与 ck_list ,不同值的列表中第一个群集键时,PK = PK。在code未在这里显示,不过我却直接从卡桑德拉使用定制列表下载星火驱动程序列表中。

I started off with ck_list, a list of distinct values for the first cluster key when pk = PK. The code is not shown here, but I actually downloaded this list directly from Cassandra in the Spark driver using CQL.

我然后转换成ck_list RDDS列表。接下来我们结合RDDS(每个重presenting一排卡桑德拉片)合并为一个统一的RDD( wide_row )。

I then transform ck_list into a list of RDDS. Next we combine the RDDs (each one representing a Cassandra row slice) into one unified RDD (wide_row).

在铸造 CassandraRDD 是必要的,因为联盟返回类型为 org.apache.spark .rdd.RDD

The cast on CassandraRDD is necessary because union returns type org.apache.spark.rdd.RDD

运行,我能够确认 wide_row 拥有X分区,其中x是ck_list规模作业后。一个有用的副作用是, wide_row 被第一簇键,这也是我想要减少的关键分区。因此,更洗牌是可以避免的。

After running the job I was able to verify that the wide_row had x partitions where x is the size of ck_list. A useful side effect is that wide_row is partitioned by the first cluster key, which is also the key I want to reduce by. Hence even more shuffling is avoided.

我不知道这是实现我想要的东西的最佳方式,但它确实有效。

I don't know if this is the best way to achieve what I wanted, but it certainly works.

val ck_list // list first cluster key values where pk = PK

val wide_row = ck_list.map( ck =>
  sc.cassandraTable(KS, TBL)
    .select("c1", "c2").where("pk = ? and ck1 = ?", PK, ck)
    .asInstanceOf[org.apache.spark.rdd.RDD] 
).reduce( (x, y) => x.union(y) )

这篇关于分布在宽行装载到星火从卡桑德拉的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆