通过Sparklyr将Cassandra表导入Spark-可能仅选择一些列? [英] Importing cassandra table into spark via sparklyr - possible to select only some columns?
问题描述
我一直在与sparklyr
一起工作,以将大型cassandra表引入火花中,并在R中进行注册,并对它们进行dplyr
操作.
I've been working with sparklyr
to bring large cassandra tables into spark, register these with R and conduct dplyr
operations on them.
我已经使用如下代码成功导入了cassandra表:
I have been successfully importing cassandra tables with the code that looks like this:
# import cassandra table into spark
cass_df <- sparklyr:::spark_data_read_generic(
sc, "org.apache.spark.sql.cassandra", "format",
list(keyspace = "cass_keyspace", table = "cass_table")
) %>%
invoke("load")
# register table in R
cass_tbl <- sparklyr:::spark_partition_register_df(
sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
)
其中一些cassandra表非常大(> 85亿行),并且需要花费一些时间来导入/注册,甚至导致6个节点总共运行60个内核和192gb RAM,甚至导致内存超支.但是,我通常只需要每个cassandra数据库中的几列.
Some of these cassandra tables are pretty large ( > 8.5bn rows) and take a while to import/register, and some lead to memory overruns, even with 6 nodes running a total of 60 cores and 192gb RAM. However, I only typically need a few of the columns from each cassandra database.
我的问题是:
- 是否有可能在导入/注册时过滤cassandra数据库,使其仅导入一些列,或者在主键上对其进行过滤(即通过传递
SQL
/CQL
类型查询,例如SELECT name FROM cass_table WHERE id = 5
)? - 上面的代码在哪里进行这样的查询,语法采用什么形式?
- Is it possible to filter the cassandra database on import/registration so that it only imports some columns or so that it is filtered on the primary key (i.e. by passing
SQL
/CQL
type queries such asSELECT name FROM cass_table WHERE id = 5
)? - Where would such a query go in the above code, and what form does the syntax take?
我尝试将这样的查询添加为选项列表中的附加选项,即:
I have tried adding such a query as an additional option in the options list, i.e.:
list(. . . , select = "id")
以及在%>% invoke("load")
之前作为单独的管道调用它,即:
as well as invoking it as a separate pipe before %>% invoke("load")
, i.e.:
invoke("option", "select", "id") %>%
# OR
invoke("option", "query", s"select id from cass_table") %>%
但是这些不起作用.有什么建议吗?
But these do not work. Any suggestions?
推荐答案
您可以跳过快速缓存并选择感兴趣的列:
You can skip eager cache and select columns of interest:
session <- spark_session(sc)
# Some columns to select
cols <- list("x", "y", "z")
cass_df <- session %>%
invoke("read") %>%
invoke("format", "org.apache.spark.sql.cassandra") %>%
invoke("options", as.environment(list(keyspace="test"))) %>%
invoke("load") %>%
# We use select(col: String, cols* String) so the first column
# has to be used separately. If you want only one column the third argument
# has to be an empty list
invoke("select", cols[[1]], cols[2:length(cols)]) %>%
# Standard lazy cache if you need one
invoke("cache")
如果您使用的谓词可以显着减少获取的数据集的数量,则pushdown
选项设置为"true"
(默认值),并使用filter
之前缓存.
If you use predicates which can significantly reduce amount of fetched data set pushdown
option to "true"
(default) and use filter
before caching.
如果要传递更复杂的查询,请注册临时视图和sql
方法:
If you want to pass more complex query you register temporary view and sql
method:
session %>%
invoke("read") %>%
...
invoke("load") %>%
invoke("createOrReplaceTempView", "some_name")
cass_df <- session %>%
invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>%
invoke("cache")
这篇关于通过Sparklyr将Cassandra表导入Spark-可能仅选择一些列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!