通过Sparklyr将Cassandra表导入Spark-可能仅选择一些列? [英] Importing cassandra table into spark via sparklyr - possible to select only some columns?

查看:99
本文介绍了通过Sparklyr将Cassandra表导入Spark-可能仅选择一些列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在与sparklyr一起工作,以将大型cassandra表引入火花中,并在R中进行注册,并对它们进行dplyr操作.

I've been working with sparklyr to bring large cassandra tables into spark, register these with R and conduct dplyr operations on them.

我已经使用如下代码成功导入了cassandra表:

I have been successfully importing cassandra tables with the code that looks like this:

# import cassandra table into spark

cass_df <- sparklyr:::spark_data_read_generic(
  sc, "org.apache.spark.sql.cassandra", "format", 
  list(keyspace = "cass_keyspace", table = "cass_table")
  ) %>% 
  invoke("load")


# register table in R

cass_tbl <- sparklyr:::spark_partition_register_df(
         sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
       )

其中一些cassandra表非常大(> 85亿行),并且需要花费一些时间来导入/注册,甚至导致6个节点总共运行60个内核和192gb RAM,甚至导致内存超支.但是,我通常只需要每个cassandra数据库中的几列.

Some of these cassandra tables are pretty large ( > 8.5bn rows) and take a while to import/register, and some lead to memory overruns, even with 6 nodes running a total of 60 cores and 192gb RAM. However, I only typically need a few of the columns from each cassandra database.

我的问题是:

  1. 是否有可能在导入/注册时过滤cassandra数据库,使其仅导入一些列,或者在主键上对其进行过滤(即通过传递SQL/CQL类型查询,例如SELECT name FROM cass_table WHERE id = 5 )?
  2. 上面的代码在哪里进行这样的查询,语法采用什么形式?
  1. Is it possible to filter the cassandra database on import/registration so that it only imports some columns or so that it is filtered on the primary key (i.e. by passing SQL / CQL type queries such as SELECT name FROM cass_table WHERE id = 5)?
  2. Where would such a query go in the above code, and what form does the syntax take?

我尝试将这样的查询添加为选项列表中的附加选项,即:

I have tried adding such a query as an additional option in the options list, i.e.:

list(. . . , select = "id")

以及在%>% invoke("load")之前作为单独的管道调用它,即:

as well as invoking it as a separate pipe before %>% invoke("load"), i.e.:

invoke("option", "select", "id") %>%

# OR

invoke("option", "query", s"select id from cass_table") %>%

但是这些不起作用.有什么建议吗?

But these do not work. Any suggestions?

推荐答案

您可以跳过快速缓存并选择感兴趣的列:

You can skip eager cache and select columns of interest:

session <- spark_session(sc)

# Some columns to select
cols <- list("x", "y", "z")

cass_df <- session %>% 
  invoke("read") %>% 
  invoke("format", "org.apache.spark.sql.cassandra") %>% 
  invoke("options", as.environment(list(keyspace="test"))) %>% 
  invoke("load") %>% 
  # We use select(col: String, cols* String) so the first column
  # has to be used separately. If you want only one column the third argument
  # has to be an empty list 
  invoke("select", cols[[1]], cols[2:length(cols)]) %>%
  # Standard lazy cache if you need one
  invoke("cache")

如果您使用的谓词可以显着减少获取的数据集的数量,则pushdown选项设置为"true"(默认值),并使用filter 之前缓存.

If you use predicates which can significantly reduce amount of fetched data set pushdown option to "true" (default) and use filter before caching.

如果要传递更复杂的查询,请注册临时视图和sql方法:

If you want to pass more complex query you register temporary view and sql method:

session %>%
  invoke("read") %>% 
  ...
  invoke("load") %>% 
  invoke("createOrReplaceTempView", "some_name")

cass_df <- session %>% 
  invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>%
  invoke("cache")

这篇关于通过Sparklyr将Cassandra表导入Spark-可能仅选择一些列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆