如何启动星火流过程时加载历史数据,并计算运行聚合 [英] How to load history data when starting Spark Streaming process, and calculate running aggregations

查看:278
本文介绍了如何启动星火流过程时加载历史数据,并计算运行聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些销售相关的JSON数据在我的ElasticSearch集群,我想用星火流(使用星火1.4.1),以动态地从通过卡夫卡我的电子商务网站,总计流入促销活动有一个当前视图用户的销售总额(营业收入和产品方面)。

I have some sales-related JSON data in my ElasticSearch cluster, and I would like to use Spark Streaming (using Spark 1.4.1) to dynamically aggregate incoming sales events from my eCommerce website via Kafka, to have a current view to the user's total sales (in terms of revenue and products).

什么是不是真的从我阅读文档清楚,我是我怎么能在星火应用程序的启动加载从ElasticSearch历史数据,并计算例如每用户的整体收入(基于历史和从卡夫卡传入销售)。

What's not really clear to me from the docs I read is how I can load the history data from ElasticSearch upon the start of the Spark application, and to calculate for example the overall revenue per user (based on the history, and the incoming sales from Kafka).

我有以下的(工作)code连接到我的卡夫卡实例,并收到JSON文件:

I have the following (working) code to connect to my Kafka instance and receive the JSON documents:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

object ReadFromKafka {
  def main(args: Array[String]) {

    val checkpointDirectory = "/tmp"
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]")
    val topicsSet = Array("tracking").toSet

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))

    // Create direct kafka stream with brokers and topics
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    //Iterate
    messages.foreachRDD { rdd =>

      //If data is present, continue
      if (rdd.count() > 0) {

        //Create SQLContect and parse JSON
        val sqlContext = new SQLContext(sc)
        val trackingEvents = sqlContext.read.json(rdd.values)

        //Sample aggregation of incoming data
        trackingEvents.groupBy("type").count().show()

      }

    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

我知道,有一个为ElasticSearch(插件的https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read),但它不是真正清楚,我如何在启动时读取,和流式计算过程集成与流数据汇总历史数据。

I know that there's a plugin for ElasticSearch (https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read), but it's not really clear to me how to integrate the read upon startup, and the streaming calculation process to aggregate the history data with the streaming data.

帮助是非常AP preaciated!先谢谢了。

Help is much appreaciated! Thanks in advance.

推荐答案

RDDS是不变的,所以在创建后,他们不能将数据添加到它们,例如更新与新事件的收入。

RDDs are immutable, so after they are created you cannot add data to them, for example updating the revenue with new events.

你可以做的是联盟与新事件的现有数据来创建新RDD,然后你就可以作为当前总使用。例如...

What you can do is union the existing data with the new events to create a new RDD, which you can then use as the current total. For example...

var currentTotal: RDD[(Key, Value)] = ... //read from ElasticSearch
messages.foreachRDD { rdd =>
    currentTotal = currentTotal.union(rdd)
}

在这种情况下,我们做 currentTotal A VAR ,因为它会被引用在新RDD被替换它得到被联合与输入数据

In this case we make currentTotal a var since it will be replaced by the reference to the new RDD when it gets unioned with the incoming data.

工会后,您可能需要执行一些进一步的操作,如减少属于同一键的值,但你得到的图片。

After the union you may want to perform some further operations such as reducing the values which belong to the same Key, but you get the picture.

如果您使用此方法请注意,您RDDS的血统会增长,因为每个新创建的RDD将引用其父。这可能会导致堆栈溢出的风格传承问题。为了解决这个问题,你可以叫检查点()在RDD定期。

If you use this technique note that the lineage of your RDDs will grow, as each newly created RDD will reference its parent. This can cause a stack overflow style lineage problem. To fix this you can call checkpoint() on the RDD periodically.

这篇关于如何启动星火流过程时加载历史数据,并计算运行聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆