如何在 Scala 中使用 spark cassandra 连接器 API [英] How to Use spark cassandra connector API in scala

查看:28
本文介绍了如何在 Scala 中使用 spark cassandra 连接器 API的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我之前的帖子:修复准备好的 stmt 警告.

我无法解决它,建议很少,我尝试使用 spark cassandra 连接器来解决我的问题.但我对它在我的应用程序中的使用感到完全困惑.我尝试编写如下代码,但不确定如何使用 API.

i was not able to solve it, with few suggestions, i tried using spark cassandra connector to solve my problem. But i am completely confused about its usage in my application. i tried to write code as below,but not sure how exactly to use the API's.

val conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", "1.1.1.1")
        .set("spark.cassandra.auth.username", "auser")            
        .set("spark.cassandra.auth.password", "apass")
        .set("spark.cassandra.connection.port","9042")

      val sc=new SparkContext(conf)  

      val c = CassandraConnector(sc.getConf)
c.withSessionDo ( session => session.prepareStatement(session,insertQuery)

    val boundStatement = new BoundStatement(insertStatement)

    batch.add(boundStatement.bind(data.service_id, data.asset_id, data.summ_typ, data.summ_dt, data.trp_summ_id, data.asset_serial_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.device_id, data.device_serial_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa, data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur, data.st_addr, data.en_addr))

)

   def prepareStatement(session: Session, query: String): PreparedStatement = {
    val cluster = session.clustername
    get(cluster, query.toString) match {
      case Some(stmt) => stmt
      case None =>
        synchronized {
          get(cluster, query.toString) match {
            case Some(stmt) => stmt
            case None =>
              val stmt = session.prepare(query)
              put(cluster, query.toString, stmt)
          }
        }
    }
  }


  -----------------------------------------------------------------------------------------OR

   val table1 = spark.read
                 .format("org.apache.spark.sql.cassandra")
                 .option( "spark.cassandra.auth.username","apoch_user")
                 .option("spark.cassandra.auth.password","Apoch#123")
                 .options(Map(
                      "table" -> "trip_summary_data",
                       "keyspace" -> "aphoc" ,
                      "cluster" -> "Cluster1"
                       ) ).load()


     def insert( data: TripHistoryData) {

    table1.createOrReplaceTempView("inputTable1");

val df1= spark.sql("select * from inputTable1 where service_id = ? and asset_id = ? and summ_typ = ? and summ_dt >= ? and summ_dt <= ?");
val df2=spark.sql("insert into inputTable1 values (data.service_id, data.asset_id, data.summ_typ, data.summ_dt, data.trp_summ_id, data.asset_serial_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.device_id, data.device_serial_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa, data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur, data.st_addr, data.en_addr))


  }

推荐答案

你需要专注于如何在 Spark 应用程序中处理数据,而不是如何读取或写入数据(这当然很重要,但只有当你遇到性能问题).

You need to concentrate on how you process your data in Spark application, not how the data are read or written (it matters, of course, but only when you hit performance problems).

如果您使用的是 Spark,那么在处理 RDD 或 DataFrame 中的数据时,您需要考虑 Spark 的术语.在这种情况下,您需要使用这样的结构(使用 数据帧):

If you're using Spark, then you need to think in the Spark terms as you're processing data in RDDs or DataFrames. In this case you need to use constructs like these (with DataFrames):

val df = spark
  .read
  .cassandraFormat("words", "test")
  .load()
val newDf = df.sql(...) // some operation on source data
newDF.write
  .cassandraFormat("words_copy", "test")
  .save()

并避免直接使用session.prepare/session.executecluster.connect等——Spark connector会做prepare,以及其他底层优化.

And avoid the use of direct session.prepare/session.execute, cluster.connect, etc. - Spark connector will do prepare, and other optimizations under the hood.

这篇关于如何在 Scala 中使用 spark cassandra 连接器 API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆