将数据从 Spark-Streaming 存储到 Cassandra 时出现问题 [英] Issue while storing data from Spark-Streaming to Cassandra

查看:15
本文介绍了将数据从 Spark-Streaming 存储到 Cassandra 时出现问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

SparkStreaming 上下文从 RabbitMQ 读取一个流,间隔为 30 秒.我想修改 cassandra 中存在的相应行的几列的值,然后想将数据存储回 Cassandra.为此,我需要检查特定主键的行是否存在于 Cassandra 中,如果是,获取它并执行必要的操作.但问题是,我在驱动程序上创建 StreamingContext 并在 Worker 上执行操作.因此,他们无法获得 StreamingContext 对象的原因,因为它没有被序列化并发送给工作人员,我收到此错误:java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext.我也知道我们无法访问 foreachRDD 中的 StreamingContext .但是,如何在不出现序列化错误的情况下实现相同的功能?

SparkStreaming context reading a stream from RabbitMQ with an interval of 30 seconds. I want to modify the values of few columns of corresponding rows existing in cassandra and then want to store data back to Cassandra. For that i need to check whether the row for the particular primary key exist in Cassandra or not if, yes, fetch it and do the necessary operation. But the problem is, i create the StreamingContext on the driver and actions get performed on Worker. So, they are not able to get the StreamingContext object reason being it wasn't serialized and sent to workers and i get this error : java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext. I also know that we cannot access the StreamingContext inside foreachRDD. But, How do i achieve the same functionality here without getting serialization error?

我看过几个例子这里 但它没有帮助.

I have looked at fews examples here but it didn't help.

以下是代码片段:

   val ssc = new StreamingContext(sparkConf,30)
    val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
    receiverStream.start()      
    val lines = receiverStream.map(EventData.fromString(_))
    lines.foreachRDD{ x => if (x.toLocalIterator.nonEmpty) {
                x.foreachPartition { it => for (tuple <- it) { 
                val cookieid  = tuple.cookieid                
                val sessionid = tuple.sessionid              
                val logdate = tuple.logdate
                val EventRows =  ssc.cassandraTable("SparkTest", CassandraTable).select("*")
                .where("cookieid = '" + cookieid + "' and logdate = '" + logdate+ "' and sessionid = '" + sessionid + "')

                   Somelogic Whether row exist or not for Cookieid

                }  } }

推荐答案

SparkContext 无法序列化并在可能不同节点中的多个 worker 之间传递.如果你需要做这样的事情,你可以使用 forEachPartiion、mapPartitons.否则使用您传递的函数执行此操作

The SparkContext cannot be serialized and passed across multiple workers in possibly different nodes. If you need to do something like this you could use forEachPartiion, mapPartitons. Else do this withing your function that gets passed around

 CassandraConnector(SparkWriter.conf).withSessionDo { session =>
  ....
    session.executeAsync(<CQL Statement>)

并且在 SparkConf 中,您需要提供 Cassandra 详细信息

and in the SparkConf you need to give the Cassandra details

  val conf = new SparkConf()
    .setAppName("test")
    .set("spark.ui.enabled", "true")
    .set("spark.executor.memory", "8g")
    //  .set("spark.executor.core", "4")
    .set("spark.eventLog.enabled", "true")
    .set("spark.eventLog.dir", "/ephemeral/spark-events")
    //to avoid disk space issues - default is /tmp
    .set("spark.local.dir", "/ephemeral/spark-scratch")
    .set("spark.cleaner.ttl", "10000")
    .set("spark.cassandra.connection.host", cassandraip)
    .setMaster("spark://10.255.49.238:7077")

Java CSCParser 是一个不可序列化的库.因此,如果您在 RDD 上调用 map 或 forEach,Spark 无法向它发送可能不同的节点.一种解决方法是使用 mapPartion,在这种情况下,将在一个 SparkNode 中执行一个完整的 Parition.因此它不需要为每个调用序列化.示例

The Java CSCParser is a library that is not serializable. So Spark cannot send it possibly different nodes if you call map or forEach on the RDD. One workaround is using mapPartion, in which case one full Parition will be executed in one SparkNode. Hence it need not serialize for each call.Example

val rdd_inital_parse = rdd.mapPartitions(pLines).

 def pLines(lines: Iterator[String]) = {
    val parser = new CSVParser() ---> Cannot be serialized, will fail if using rdd.map(pLines)
    lines.map(x => parseCSVLine(x, parser.parseLine))
  }

这篇关于将数据从 Spark-Streaming 存储到 Cassandra 时出现问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆