将 columnNames 动态传递给 cassandraTable().select() [英] Pass columnNames dynamically to cassandraTable().select()

查看:20
本文介绍了将 columnNames 动态传递给 cassandraTable().select()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在运行时从文件中读取查询并在 SPark+Cassandra 环境中执行它.

I'm reading query off of a file at run-time and executing it on the SPark+Cassandra environment.

我正在执行:
sparkContext.cassandraTable.("keyspaceName", "colFamilyName").select("col1", "col2", "col3").where("some condition = true")

在文件中查询:

选择 col1、col2、col3来自 keyspaceName.colFamilyName其中 somecondition = true

select col1, col2, col3 from keyspaceName.colFamilyName where somecondition = true

这里的 Col1,col2,col3 可以根据从文件解析的查询而变化.

Here Col1,col2,col3 can vary depending on the query parsed from the file.

问题:
如何从查询中选择 columnName 并将它们传递给 select() 和运行时.

Question :
How do I pick columnName from query and pass them to select() and runtime.

我尝试了很多方法来做到这一点:
1. 做的最愚蠢的事情(显然是错误的) -

I have tried many ways to do it :
1. dumbest thing done (which obviously threw an error) -

var str = "col1,col2,col3"
var selectStmt = str.split("\,").map { x => """ + x.trim() + """ }.mkString(",")
var queryRDD = sc.cassandraTable().select(selectStmt)

欢迎提出任何想法.

附注:
1. 我不想使用 cassandraCntext 因为它会在下一个 realase 中被弃用/删除 (https://docs.datastax.com/en/datastax_enterprise/4.5/datastax_enterprise/spark/sparkCCcontext.html)
2.我在
- 一个.斯卡拉 2.11
- 乙.spark-cassandra-connector_2.11:1.6.0-M1
- C.火花1.6

Side Notes :
1. I do not want to use cassandraCntext becasue it will be depricated/ removed in next realase (https://docs.datastax.com/en/datastax_enterprise/4.5/datastax_enterprise/spark/sparkCCcontext.html)
2. I'm on
- a. Scala 2.11
- b. spark-cassandra-connector_2.11:1.6.0-M1
- c. Spark 1.6

推荐答案

使用 Cassandra 连接器

您的用例听起来像是您真的想要使用 CassandraConnector 对象.这些使您可以直接访问每个 ExecutorJVM 会话池,非常适合仅执行随机查询.这最终会比为每个查询创建一个 RDD 更有效率.

Use Cassandra Connector

Your use case sounds like you actually want to use CassandraConnector Objects. These give you a direct access to a per ExecutorJVM session pool and are ideal for just executing random queries. This will end up being much more efficient than creating an RDD for each query.

这看起来像

rddOfStatements.mapPartitions( it => 
  CassandraConnector.withSessionDo { session => 
    it.map(statement => 
      session.execute(statement))})

但您很可能希望使用 executeAsync 并单独处理期货以获得更好的性能.

But you most likely would want to use executeAsync and handle the futures separately for better performance.

select 方法采用 ColumnRef*,这意味着您需要传入一定数量的 ColumnRef.通常存在从 String --> 的隐式转换.ColumnRef 这就是为什么你可以只传入一个字符串的可变参数.

The select method takes ColumnRef* which means you need to pass in some number of ColumnRefs. Normally there is an implicit conversion from String --> ColumnRef which is why you can pass in just a var-args of strings.

这里有点复杂,因为我们想传递另一种类型的 var args,所以我们最终得到了双隐式,而 Scala 不喜欢那样.

Here it's a little more complicated because we want to pass var args of another type so we end up with double implicits and Scala doesn't like that.

因此我们将 ColumnName 对象作为可变参数 (:_*) 传入

So instead we pass in ColumnName objects as varargs (:_*)

========================================
 Keyspace: test
========================================
 Table: dummy
----------------------------------------
 - id                      : java.util.UUID                                                                   (partition key column)
 - txt                     : String


val columns = Seq("id", "txt")
columns: Seq[String] = List(id, txt)

//Convert the strings to ColumnNames (a subclass of ColumnRef) and treat as var args
sc.cassandraTable("test","dummy")
  .select(columns.map(ColumnName(_)):_*)
  .collect      

Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693, txt: hello world})

//Only use the first column
sc.cassandraTable("test","dummy")
  .select(columns.map(ColumnName(_)).take(1):_*)
  .collect

Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693})

//Only use the last column        
sc.cassandraTable("test","dummy")
  .select(columns.map(ColumnName(_)).takeRight(1):_*)
  .collect

Array(CassandraRow{txt: hello world})

这篇关于将 columnNames 动态传递给 cassandraTable().select()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆