如何使用 Foreach Spark Structure 流更改插入 Cassandra 的记录的数据类型 [英] How to change Datatypes of records inserting into Cassandra using Foreach Spark Structure streaming

查看:18
本文介绍了如何使用 Foreach Spark Structure 流更改插入 Cassandra 的记录的数据类型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark Structure Streaming 和 Foreach Sink 将反序列化的 Kafka 记录插入到 Data Stax Cassandra.

I am trying to Insert the Deserialized Kafka records to Data Stax Cassandra using Spark Structure Streaming using Foreach Sink.

例如,我的反序列化数据框数据和所有数据一样都是字符串格式.

For example, my deserialized Data frame data like all are in string format.

id   name    date
100 'test' sysdate

使用 foreach Sink 我创建了一个类并尝试通过转换来插入如下记录.

Using foreach Sink I created a class and trying to insert the records as below by converting it.

session.execute(
  s"""insert into ${cassandraDriver.namespace}.${cassandraDriver.brand_dub_sink} (id,name,date)
  values  ('${row.getAs[Long](0)}','${rowstring(1)}','${rowstring(2)}')"""))
  }
)

我完全遵循了这个项目https://github.com/epishova/Structured-Streaming-Cassandra-Sink/blob/master/src/main/scala/cassandra_sink.scala

当插入 Cassandra 表时,将字符串id"列数据类型转换为 Long 如上所述,它没有转换.并抛出错误

when inserting into Cassandra table converting the string "id" column datatype to Long as mentioned above, It is not converting. And throwing error

bigint 类型的id"的字符串常量 (100) 无效"

"Invalid STRING constant (100) for "id" of type bigint"

卡桑德拉表;-

create table test(
id bigint,
name text,
date timestamp)

在def Process"中将字符串数据类型转换为 Long 的任何建议.

Any suggestions to convert the string datatype to Long inside "def Process".

任何其他建议也很棒.谢谢

It will be great any alternative suggestion also.Thanks

这是代码:

import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.expr

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
  // This class implements the interface ForeachWriter, which has methods that get called 
  // whenever there is a sequence of rows generated as output

  var cassandraDriver: CassandraDriver = null;
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    true
  }

  def process(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    if (cassandraDriver == null) {
      cassandraDriver = new CassandraDriver();
    }
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt)
       values('${record.getLong(0)}', '${record(1)}', '${record(2)}')""")
    )
  }

  def close(errorOrNull: Throwable): Unit = {
    // close the connection
    println(s"Close connection")
  }
}

class SparkSessionBuilder extends Serializable {
  // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. 
  // Note here the usage of @transient lazy val 
  def buildSparkSession: SparkSession = {
    @transient lazy val conf: SparkConf = new SparkConf()
    .setAppName("Structured Streaming from Kafka to Cassandra")
    .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com")
    .set("spark.sql.streaming.checkpointLocation", "checkpoint")

    @transient lazy val spark = SparkSession
    .builder()
    .config(conf)
    .getOrCreate()

    spark
  }
}

class CassandraDriver extends SparkSessionBuilder {
  // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor.
  // It extends SparkSessionBuilder so to use the same SparkSession on each node.
  val spark = buildSparkSession

  import spark.implicits._

  val connector = CassandraConnector(spark.sparkContext.getConf)

  // Define Cassandra's table which will be used as a sink
  /* For this app I used the following table:
       CREATE TABLE fx.spark_struct_stream_sink (
       id Bigint,
       name text,
       timestamp_dt date,
       primary key (id));
  */
  val namespace = "fx"
  val foreachTableSink = "spark_struct_stream_sink"
}

object KafkaToCassandra extends SparkSessionBuilder {
  // Main body of the app. It also extends SparkSessionBuilder.
  def main(args: Array[String]) {
    val spark = buildSparkSession

    import spark.implicits._

    // Define location of Kafka brokers:
    val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092"

    /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n 
    {"100": "test1", "01-mar-2018"}
    {"101": "test2", "02-mar-2018"}  */
    val dfraw = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .option("subscribe", "currency_exchange")
    .load()

    val schema = StructType(
      Seq(
        StructField("id", StringType, false),
        StructField("name", StringType, false),
StructField("date", StringType, false)

      )
    )

    val df = dfraw
    .selectExpr("CAST(value AS STRING)").as[String]
    .flatMap(_.split("\n"))

    val jsons = df.select(from_json($"value", schema) as "data").select("data.*")


    val sink = jsons
    .writeStream
    .queryName("KafkaToCassandraForeach")
    .outputMode("update")
    .foreach(new CassandraSinkForeach())
    .start()

    sink.awaitTermination()
  }
}  

我修改后的代码;-

def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"in my Open connection")
    val cassandraDriver = new CassandraDriver();
    true
  }


  def process(record: Row) = {


    val optype = record(0)

    if (cassandraDriver == null) {
      val  cassandraDriver = new CassandraDriver();
    }

  if (optype == "I" || optype == "U") {

        println(s"Process insert or Update Idempotent new $record")

        cassandraDriver.connector.withSessionDo(session =>{
          val prepare_rating_brand = session.prepare(s"""insert into ${cassandraDriver.namespace}.${cassandraDriver.brand_dub_sink} (table_name,op_type,op_ts,current_ts,pos,brand_id,brand_name,brand_creation_dt,brand_modification_dt,create_date) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""")

          session.execute(prepare_rating_brand.bind(record.getAs[String](0),record.getAs[String](1),record.getAs[String](2),record.getAs[String](3),record.getAs[String](4),record.getAs[BigInt](5),record.getAs[String](6),record.getAs[String](7),record.getAs[String](8),record.getAs[String](9))
          )

        })
      }  else if (optype == "D") {

        println(s"Process delete new $record")
        cassandraDriver.connector.withSessionDo(session =>
          session.execute(s"""DELETE FROM ${cassandraDriver.namespace}.${cassandraDriver.brand_dub_sink} WHERE brand_id = ${record.getAs[Long](5)}"""))

      } else if (optype == "T") {
        println(s"Process Truncate new $record")
        cassandraDriver.connector.withSessionDo(session =>
          session.execute(s"""Truncate table  ${cassandraDriver.namespace}.${cassandraDriver.plan_rating_archive_dub_sink}"""))

      }
    }

  def close(errorOrNull: Throwable): Unit = {
    // close the connection
    println(s"Close connection")
  }


}

推荐答案

您的错误是您将 id 字段的值指定为 '${row.getAs[Long](0)}' - 你已经在它周围添加了单引号,所以它被视为字符串,而不是 long/bigint - 只需删除它周围的单引号值:${row.getAs[Long](0)}...

Your error is that you specify value for id field as '${row.getAs[Long](0)}' - you've added the single quotes around it, so it's treated as string, not as a long/bigint - just remove single quotes around this value: ${row.getAs[Long](0)}...

此外,出于性能原因,最好将 cassandra 驱动程序的实例化移动到 open 方法中,并使用准备好的语句,如下所示:

Also, for performance reasons it's better to move instantiation of the cassandra driver into open method, and use the prepared statements, something like this:

  var cassandraDriver: CassandraDriver = null;
  var preparedStatement: PreparedStatement = null;
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    cassandraDriver = new CassandraDriver();
    preparedStatement = cassandraDriver.connector.withSessionDo(session =>
      session.prepare(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} 
      (fx_marker, timestamp_ms, timestamp_dt) values(?, ?, ?)""")
    true
  }

  def process(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(preparedStatement.bind(${record.getLong(0)}, 
           ${record(1)}, ${record(2)}))
    )
  }

它的性能会更好,而且您不需要自己执行引用值.

it will be more performant, and you'll need not to perform quoting of the values yourself.

这篇关于如何使用 Foreach Spark Structure 流更改插入 Cassandra 的记录的数据类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆