动态将columnNames传递给cassandraTable().select() [英] Pass columnNames dynamically to cassandraTable().select()
问题描述
我正在运行时从文件中读取查询,并在SPark + Cassandra环境中执行它.
I'm reading query off of a file at run-time and executing it on the SPark+Cassandra environment.
我正在执行:
sparkContext.cassandraTable.("keyspaceName", "colFamilyName").select("col1", "col2", "col3").where("some condition = true")
I'm executing :
sparkContext.cassandraTable.("keyspaceName", "colFamilyName").select("col1", "col2", "col3").where("some condition = true")
查询文件:
选择col1,col2,col3 来自keyspaceName.colFamilyName 某些条件为真
select col1, col2, col3 from keyspaceName.colFamilyName where somecondition = true
此处Col1,col2,col3可能会有所不同,具体取决于从文件中解析的查询.
Here Col1,col2,col3 can vary depending on the query parsed from the file.
问题:
如何从查询中选择columnName并将其传递给select()和运行时.
Question :
How do I pick columnName from query and pass them to select() and runtime.
我已经尝试了许多方法来实现它:
1.最愚蠢的事情(显然会引发错误)-
I have tried many ways to do it :
1. dumbest thing done (which obviously threw an error) -
var str = "col1,col2,col3"
var selectStmt = str.split("\\,").map { x => "\"" + x.trim() + "\"" }.mkString(",")
var queryRDD = sc.cassandraTable().select(selectStmt)
任何想法都欢迎.
注意事项:
1.我不想使用cassandraCntext,因为它将在下一个版本中被删除/删除( https://docs.datastax.com/en/datastax_enterprise/4.5/datastax_enterprise/spark/sparkCCcontext.html )
2.我在
- 一种. Scala 2.11
-b. spark-cassandra-connector_2.11:1.6.0-M1
- C. Spark 1.6
Side Notes :
1. I do not want to use cassandraCntext becasue it will be depricated/ removed in next realase (https://docs.datastax.com/en/datastax_enterprise/4.5/datastax_enterprise/spark/sparkCCcontext.html)
2. I'm on
- a. Scala 2.11
- b. spark-cassandra-connector_2.11:1.6.0-M1
- c. Spark 1.6
推荐答案
使用Cassandra连接器
您的用例听起来像您实际上要使用CassandraConnector
对象.这些使您可以直接访问每个ExecutorJVM会话池,并且非常适合仅执行随机查询.这样最终将比为每个查询创建一个RDD更为有效.
Use Cassandra Connector
Your use case sounds like you actually want to use CassandraConnector
Objects. These give you a direct access to a per ExecutorJVM session pool and are ideal for just executing random queries. This will end up being much more efficient than creating an RDD for each query.
这看起来像
rddOfStatements.mapPartitions( it =>
CassandraConnector.withSessionDo { session =>
it.map(statement =>
session.execute(statement))})
但是您很可能希望使用executeAsync
并分别处理期货以获得更好的性能.
But you most likely would want to use executeAsync
and handle the futures separately for better performance.
select
方法采用ColumnRef*
,这意味着您需要传递一定数量的ColumnRef
.通常,String
->有一个隐式转换. ColumnRef
这就是为什么您只能传递字符串的var-args的原因.
The select
method takes ColumnRef*
which means you need to pass in some number of ColumnRef
s. Normally there is an implicit conversion from String
--> ColumnRef
which is why you can pass in just a var-args of strings.
这有点复杂,因为我们想传递另一种类型的var args,所以我们最终得到了两次隐式隐式,而Scala则不喜欢这样.
Here it's a little more complicated because we want to pass var args of another type so we end up with double implicits and Scala doesn't like that.
因此,我们改为将ColumnName
对象作为varargs(:_ *)
So instead we pass in ColumnName
objects as varargs (:_*)
========================================
Keyspace: test
========================================
Table: dummy
----------------------------------------
- id : java.util.UUID (partition key column)
- txt : String
val columns = Seq("id", "txt")
columns: Seq[String] = List(id, txt)
//Convert the strings to ColumnNames (a subclass of ColumnRef) and treat as var args
sc.cassandraTable("test","dummy")
.select(columns.map(ColumnName(_)):_*)
.collect
Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693, txt: hello world})
//Only use the first column
sc.cassandraTable("test","dummy")
.select(columns.map(ColumnName(_)).take(1):_*)
.collect
Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693})
//Only use the last column
sc.cassandraTable("test","dummy")
.select(columns.map(ColumnName(_)).takeRight(1):_*)
.collect
Array(CassandraRow{txt: hello world})
这篇关于动态将columnNames传递给cassandraTable().select()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!