动态将columnNames传递给cassandraTable().select() [英] Pass columnNames dynamically to cassandraTable().select()

查看:90
本文介绍了动态将columnNames传递给cassandraTable().select()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在运行时从文件中读取查询,并在SPark + Cassandra环境中执行它.

I'm reading query off of a file at run-time and executing it on the SPark+Cassandra environment.

我正在执行:
sparkContext.cassandraTable.("keyspaceName", "colFamilyName").select("col1", "col2", "col3").where("some condition = true")

I'm executing :
sparkContext.cassandraTable.("keyspaceName", "colFamilyName").select("col1", "col2", "col3").where("some condition = true")

查询文件:

选择col1,col2,col3 来自keyspaceName.colFamilyName 某些条件为真

select col1, col2, col3 from keyspaceName.colFamilyName where somecondition = true

此处Col1,col2,col3可能会有所不同,具体取决于从文件中解析的查询.

Here Col1,col2,col3 can vary depending on the query parsed from the file.

问题:
如何从查询中选择columnName并将其传递给select()和运行时.

Question :
How do I pick columnName from query and pass them to select() and runtime.

我已经尝试了许多方法来实现它:
1.最愚蠢的事情(显然会引发错误)-

I have tried many ways to do it :
1. dumbest thing done (which obviously threw an error) -

var str = "col1,col2,col3"
var selectStmt = str.split("\\,").map { x => "\"" + x.trim() + "\"" }.mkString(",")
var queryRDD = sc.cassandraTable().select(selectStmt)

任何想法都欢迎.

注意事项:
1.我不想使用cassandraCntext,因为它将在下一个版本中被删除/删除( https://docs.datastax.com/en/datastax_enterprise/4.5/datastax_enterprise/spark/sparkCCcontext.html )
2.我在
- 一种. Scala 2.11
-b. spark-cassandra-connector_2.11:1.6.0-M1
- C. Spark 1.6

Side Notes :
1. I do not want to use cassandraCntext becasue it will be depricated/ removed in next realase (https://docs.datastax.com/en/datastax_enterprise/4.5/datastax_enterprise/spark/sparkCCcontext.html)
2. I'm on
- a. Scala 2.11
- b. spark-cassandra-connector_2.11:1.6.0-M1
- c. Spark 1.6

推荐答案

使用Cassandra连接器

您的用例听起来像您实际上要使用CassandraConnector对象.这些使您可以直接访问每个ExecutorJVM会话池,并且非常适合仅执行随机查询.这样最终将比为每个查询创建一个RDD更为有效.

Use Cassandra Connector

Your use case sounds like you actually want to use CassandraConnector Objects. These give you a direct access to a per ExecutorJVM session pool and are ideal for just executing random queries. This will end up being much more efficient than creating an RDD for each query.

这看起来像

rddOfStatements.mapPartitions( it => 
  CassandraConnector.withSessionDo { session => 
    it.map(statement => 
      session.execute(statement))})

但是您很可能希望使用executeAsync并分别处理期货以获得更好的性能.

But you most likely would want to use executeAsync and handle the futures separately for better performance.

select方法采用ColumnRef*,这意味着您需要传递一定数量的ColumnRef.通常,String->有一个隐式转换. ColumnRef这就是为什么您只能传递字符串的var-args的原因.

The select method takes ColumnRef* which means you need to pass in some number of ColumnRefs. Normally there is an implicit conversion from String --> ColumnRef which is why you can pass in just a var-args of strings.

这有点复杂,因为我们想传递另一种类型的var args,所以我们最终得到了两次隐式隐式,而Scala则不喜欢这样.

Here it's a little more complicated because we want to pass var args of another type so we end up with double implicits and Scala doesn't like that.

因此,我们改为将ColumnName对象作为varargs(:_ *)

So instead we pass in ColumnName objects as varargs (:_*)

========================================
 Keyspace: test
========================================
 Table: dummy
----------------------------------------
 - id                      : java.util.UUID                                                                   (partition key column)
 - txt                     : String


val columns = Seq("id", "txt")
columns: Seq[String] = List(id, txt)

//Convert the strings to ColumnNames (a subclass of ColumnRef) and treat as var args
sc.cassandraTable("test","dummy")
  .select(columns.map(ColumnName(_)):_*)
  .collect      

Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693, txt: hello world})

//Only use the first column
sc.cassandraTable("test","dummy")
  .select(columns.map(ColumnName(_)).take(1):_*)
  .collect

Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693})

//Only use the last column        
sc.cassandraTable("test","dummy")
  .select(columns.map(ColumnName(_)).takeRight(1):_*)
  .collect

Array(CassandraRow{txt: hello world})

这篇关于动态将columnNames传递给cassandraTable().select()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆