从Spark中的cassandra表中删除 [英] Delete from cassandra Table in Spark

查看:71
本文介绍了从Spark中的cassandra表中删除的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将Spark与cassandra一起使用.我正在从表中读取一些行,以便使用PrimaryKey删除主题.这是我的代码:

I'm using Spark with cassandra. And i'm reading some rows from my table in order to delete theme using the PrimaryKey. This is my code :

val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
  select("a","b","c","d").
  where("d=?", d).cache()

lines.foreach(r => {
    val session: Session = connector.openSession
    val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
    session.execute(delete)
    session.close()
})

但是此方法为每一行创建一个会话,这需要很多时间.因此可以使用sc.CassandraTable或比我更好的其他解决方案删除我的行.

But this method create an session for each row and it takes lot of time. So is it possible to delete my rows using sc.CassandraTable or another solution better then the mine.

谢谢

推荐答案

我认为Cassandra连接器上目前不支持delete.为了摊销连接建立的成本,建议的方法是将操作应用于每个分区.

I don't think there's a support for delete at the moment on the Cassandra Connector. To amortize the cost of connection setup, the recommended approach is to apply the operation to each partition.

因此您的代码将如下所示:

So your code will look like this:

lines.foreachPartition(partition => {
    val session: Session = connector.openSession //once per partition
    partition.foreach{elem => 
        val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where     channel='"+elem._1 +"' and ctid='"+elem._2+"'and cvid='"+elem._3+"';"
        session.execute(delete)
    }
    session.close()
})

您还可以考虑使用DELETE FROM ... WHERE pk IN (list),并使用类似的方法为每个分区构建list.这将表现得更好,但可能会因分区很大而中断,因为该列表将因此变得很长.在应用此功能之前,请对目标RDD进行重新分区将有所帮助.

You could also look into using the DELETE FROM ... WHERE pk IN (list) and use a similar approach to build up the list for each partition. This will be even more performant, but might break with very large partitions as the list will become consequentially long. Repartitioning your target RDD before applying this function will help.

这篇关于从Spark中的cassandra表中删除的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆