Spark Cassandra Connector正确使用 [英] Spark Cassandra Connector proper usage

查看:98
本文介绍了Spark Cassandra Connector正确使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望将Spark用于某些ETL,该ETL主要由 update语句组成(一列是一组,将附加到其后,因此简单的插入可能无法正常工作)。这样,似乎发出CQL查询以导入数据是最好的选择。使用Spark Cassandra连接器,我可以执行以下操作:

I'm looking to use spark for some ETL, which will mostly consist of "update" statements (a column is a set, that'll be appended to, so a simple insert is likely not going to work). As such, it seems like issuing CQL queries to import the data is the best option. Using the Spark Cassandra Connector, I see I can do this:

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting -manually-to-cassandra

现在我不想打开一个会话并为源代码中的每一行关闭它(我是不是想要这个吗?通常来说,我在整个过程中只有一个会话,并且继续在常规应用中使用它)。但是,它说连接器是可序列化的,但是会话显然不是。因此,将整个导入文件包装在一个 withSessionDo中似乎会引起问题。我正在考虑使用类似这样的东西:

Now I don't want to open a session and close it for every row in the source (am I right in not wanting this? Usually, I have one session for the entire process, and keep using that in "normal" apps). However, it says that the connector is serializable, but the session is obviously not. So, wrapping the whole import inside a single "withSessionDo" seems like it'll cause problems. I was thinking of using something like this:

class CassandraStorage(conf:SparkConf) {
  val session = CassandraConnector(conf).openSession()
  def store (t:Thingy) : Unit = {
    //session.execute cql goes here
  }
}

这是一个好方法吗?我是否需要担心结束会议?我在哪里/如何做得最好?

Is this a good approach? Do I need to worry about closing the session? Where / how best would I do that? Any pointers are appreciated.

推荐答案

您实际上确实想使用 withSessionDo 因为它实际上不会在每次访问时打开和关闭会话。在后台, withSessionDo 访问JVM级别的会话。这意味着您将只有一个会话对象PER群集配置PER节点。

You actually do want to use withSessionDo because it won't actually open and close a session on every access. Under the hood, withSessionDo accesses a JVM level session. This means you will only have one session object PER cluster configuration PER node.

这意味着代码

val connector = CassandraConnector(sc.getConf)
sc.parallelize(1 to 10000000L).map(connector.withSessionDo( Session => stuff)

无论每台计算机有多少个内核,都将在每个执行者JVM上仅创建1个集群和会话对象。

Will only ever make 1 cluster and session object on each executor JVM regardless of how many cores each machine has.

为了提高效率,我仍然建议使用mappartitions以最大程度地减少缓存检查。

For efficiency i would still recommend using mapPartitions to minimize cache checks.

sc.parallelize(1 to 10000000L)
  .mapPartitions(it => connector.withSessionDo( session => 
      it.map( row => do stuff here )))

此外,会话对象还使用了prepare缓存,该缓存使您可以在序列化代码中缓存一条prepared语句,并且每个jvm只会对其准备一次(所有其他调用都将返回缓存引用。)

In addition the session object also uses a prepare cache, which lets you cache a prepared statement in your serialized code, and it will only ever be prepared once per jvm(all other calls will return the cache reference.)

这篇关于Spark Cassandra Connector正确使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆