如何在启动 Spark Streaming 进程时加载历史数据,并计算运行聚合 [英] How to load history data when starting Spark Streaming process, and calculate running aggregations

查看:18
本文介绍了如何在启动 Spark Streaming 进程时加载历史数据,并计算运行聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 ElasticSearch 集群中有一些与销售相关的 JSON 数据,我想使用 Spark Streaming(使用 Spark 1.4.1)通过 Kafka 从我的电子商务网站动态聚合传入的销售事件,以获取当前视图用户的总销售额(就收入和产品而言).

I have some sales-related JSON data in my ElasticSearch cluster, and I would like to use Spark Streaming (using Spark 1.4.1) to dynamically aggregate incoming sales events from my eCommerce website via Kafka, to have a current view to the user's total sales (in terms of revenue and products).

从我阅读的文档中我不太清楚的是如何在 Spark 应用程序启动时从 ElasticSearch 加载历史数据,并计算例如每个用户的总收入(基于历史,以及来自 Kafka 的销售收入).

What's not really clear to me from the docs I read is how I can load the history data from ElasticSearch upon the start of the Spark application, and to calculate for example the overall revenue per user (based on the history, and the incoming sales from Kafka).

我有以下(工作)代码来连接到我的 Kafka 实例并接收 JSON 文档:

I have the following (working) code to connect to my Kafka instance and receive the JSON documents:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

object ReadFromKafka {
  def main(args: Array[String]) {

    val checkpointDirectory = "/tmp"
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]")
    val topicsSet = Array("tracking").toSet

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))

    // Create direct kafka stream with brokers and topics
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    //Iterate
    messages.foreachRDD { rdd =>

      //If data is present, continue
      if (rdd.count() > 0) {

        //Create SQLContect and parse JSON
        val sqlContext = new SQLContext(sc)
        val trackingEvents = sqlContext.read.json(rdd.values)

        //Sample aggregation of incoming data
        trackingEvents.groupBy("type").count().show()

      }

    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

我知道 ElasticSearch 有一个插件(https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read),但我不太清楚如何在启动时集成读取,以及流计算过程将历史数据与流数据聚合.

I know that there's a plugin for ElasticSearch (https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read), but it's not really clear to me how to integrate the read upon startup, and the streaming calculation process to aggregate the history data with the streaming data.

非常感谢帮助!提前致谢.

Help is much appreaciated! Thanks in advance.

推荐答案

RDD 是不可变的,因此在创建它们之后,您无法向它们添加数据,例如使用新事件更新收入.

RDDs are immutable, so after they are created you cannot add data to them, for example updating the revenue with new events.

您可以做的是将现有数据与新事件合并以创建新的 RDD,然后您可以将其用作当前总数.例如...

What you can do is union the existing data with the new events to create a new RDD, which you can then use as the current total. For example...

var currentTotal: RDD[(Key, Value)] = ... //read from ElasticSearch
messages.foreachRDD { rdd =>
    currentTotal = currentTotal.union(rdd)
}

在这种情况下,我们将 currentTotal 设为 var,因为当它与传入的数据合并时,它将被对新 RDD 的引用替换.

In this case we make currentTotal a var since it will be replaced by the reference to the new RDD when it gets unioned with the incoming data.

union 之后你可能想要执行一些进一步的操作,比如减少属于同一个 Key 的值,但你得到了图片.

After the union you may want to perform some further operations such as reducing the values which belong to the same Key, but you get the picture.

如果您使用这种技术,请注意您的 RDD 的谱系将会增长,因为每个新创建的 RDD 都将引用其父级.这可能会导致堆栈溢出样式沿袭问题.要解决此问题,您可以定期调用 RDD 上的 checkpoint().

If you use this technique note that the lineage of your RDDs will grow, as each newly created RDD will reference its parent. This can cause a stack overflow style lineage problem. To fix this you can call checkpoint() on the RDD periodically.

这篇关于如何在启动 Spark Streaming 进程时加载历史数据,并计算运行聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆