使用 Spark Streaming 从 Cassandra 读取 [英] Reading from Cassandra using Spark Streaming

查看:80
本文介绍了使用 Spark Streaming 从 Cassandra 读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我使用 Spark Streaming 从 Cassandra 读取数据时遇到问题.

I have a problem when i use spark streaming to read from Cassandra.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

作为上面的链接,我使用

As the link above, i use

val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)

从 cassandra 中选择数据,但似乎火花流只有一次查询,但我希望它继续使用间隔 10 秒进行查询.

to select the data from cassandra, but it seems that the spark streaming has just one query once but i want it continues to query using an interval 10 senconds.

我的代码如下,希望得到您的回复.

My code is as follow, wish for your response.

谢谢!

import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue


object SimpleApp {
def main(args: Array[String]){
    val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")

    val ssc = new StreamingContext(conf, Seconds(10))

    val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

    //rdd.collect().foreach(println)

    val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()


    val dstream = ssc.queueStream(rddQueue)

    dstream.print()

    ssc.start()
    rdd.collect().foreach(println)
    rddQueue += rdd
    ssc.awaitTermination()
}  

}

推荐答案

您可以创建一个以 CassandraRDD 作为输入的 ConstantInputDStream.ConstantInputDStream 将在每个流间隔上提供相同的 RDD,并且通过在该 RDD 上执行一个操作,您将触发 RDD 沿袭的物化,从而导致每次都在 Cassandra 上执行查询.

You can create a ConstantInputDStream with the CassandraRDD as input. ConstantInputDStream will provide the same RDD on each streaming interval, and by executing an action on that RDD you will trigger a materialization of the RDD lineage, leading to executing the query on Cassandra every time.

确保被查询的数据不会无限增长,以避免增加查询次数并导致流处理不稳定.

Make sure that the data being queried does not grow unbounded to avoid increasing query times and resulting in an unstable streaming process.

这样的事情应该可以解决问题(使用您的代码作为起点):

Something like this should do the trick (using your code as starting point):

import org.apache.spark.streaming.dstream.ConstantInputDStream

val ssc = new StreamingContext(conf, Seconds(10))

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

val dstream = new ConstantInputDStream(ssc, cassandraRDD)

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output
    println(rdd.collect.mkString("\n")) 
}
ssc.start()
ssc.awaitTermination()

这篇关于使用 Spark Streaming 从 Cassandra 读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆