通过RDD值从卡桑德拉表滤波器 [英] Filter from Cassandra table by RDD values

查看:172
本文介绍了通过RDD值从卡桑德拉表滤波器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想查询从卡桑德拉根据我在RDD值的一些数据。我的做法是这样的:

I'd like to query some data from Cassandra based on values I have in an RDD. My approach is the following:

val userIds = sc.textFile("/tmp/user_ids").keyBy( e => e ) 
val t = sc.cassandraTable("keyspace", "users").select("userid", "user_name") 
val userNames = userIds.flatMap { userId => 
  t.where("userid = ?", userId).take(1) 
} 
userNames.take(1) 

虽然卡桑德拉查询在星火外壳的作品,它抛出一个异常,当我用它里面flatMap:

While the Cassandra query works in Spark shell, it throws an exception when I used it inside flatMap:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NullPointerException: 
        org.apache.spark.rdd.RDD.<init>(RDD.scala:125) 
        com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:49) 
        com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83) 
        com.datastax.spark.connector.rdd.CassandraRDD.where(CassandraRDD.scala:94) 

我的理解是,我不能产生RDD(Cassandra的结果),另一个RDD内。

My understanding is that I cannot produce an RDD (Cassandra results) inside another RDD.

我在网上找到的例子读取整个表卡桑德拉在RDD并加入RDDS
(是这样的:的https://cassandrastuff.word$p$pss.com/2014/07/07/cassandra-and-spark-table-joins/).但是,如果卡桑德拉表是巨大的它不能扩展。

The examples I found on the web read the whole Cassandra table in an RDD and join RDDs (like this: https://cassandrastuff.wordpress.com/2014/07/07/cassandra-and-spark-table-joins/). But it won't scale if the Cassandra table is huge.

但我怎么处理这个问题呢?

But how do I approach this problem instead?

推荐答案

星火1.2引入了 joinWithCassandraTable

val userids = sc.textFile("file:///Users/russellspitzer/users.list")
userids
 .map(Tuple1(_))
 .joinWithCassandraTable("keyspace","table")

这code最终会做相同的工作,下面的解决方案
确实。在 joinWithCassandraTable 方法将使用相同​​的code作为
saveToCassandra 使用转换类到的东西,可以卡桑德拉
理解。这就是为什么我们需要一个元组
而不仅仅是一个简单的字符串来执行连接。

This code will end up doing the identical work that the solution below does. The joinWithCassandraTable method will use the same code as the saveToCassandra uses to transform classes into something that Cassandra can understand. This is why we need a tuple rather than just a simple string to perform the join.

<一个href=\"https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable\" rel=\"nofollow\">https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable

我想到你居然想在这里做的是做对两个数据源内连接。这实际上应该是比flatmap方法更快,以及有一些内部智能哈希。

I think what you actually want to do here is do an inner join on the two datasources. This should actually be faster than a flatmap approach as well as there is some internal smart hashing.

scala> val userids = sc.textFile("file:///Users/russellspitzer/users.list")
scala> userids.take(5)
res19: Array[String] = Array(3, 2)

scala> sc.cassandraTable("test","users").collect
res20: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{userid: 3, username: Jacek}, CassandraRow{userid: 1, username: Russ}, CassandraRow{userid: 2, username: Helena})

scala> userids.map(line => (line.toInt,true)).join(sc.cassandraTable("test","users").map(row => (row.getInt("userid"),row.getString("username")))).collect
res18: Array[(Int, (Boolean, String))] = Array((2,(true,Helena)), (3,(true,Jacek)))

如果你真的只想执行一帮对你的C *数据库主键查询,你可能会更好只使用普通驾驶员途径执行他们,而不是使用的火花。

If you actually just want to execute a bunch of primary key queries against your C* database you may be better off just executing them using normal driver pathways and not using spark.

import com.datastax.spark.connector.cql.CassandraConnector
import collection.JavaConversions._

val cc = CassandraConnector(sc.getConf)
val select = s"SELECT * FROM cctest.users where userid=?"
val ids = sc.parallelize(1 to 10)
ids.flatMap(id =>
      cc.withSessionDo(session =>
        session.execute(select, id.toInt: java.lang.Integer).iterator.toList.map(row =>
          (row.getInt("userid"), row.getString("username"))))).collect

这篇关于通过RDD值从卡桑德拉表滤波器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆