批次之间的Spark流数据共享 [英] Spark streaming data sharing between batches

查看:212
本文介绍了批次之间的Spark流数据共享的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

火花流以微批处理方式处理数据.

Spark streaming processes the data in micro batches.

每个间隔数据都是使用RDD并行处理的,每个间隔之间没有任何数据共享.

Each interval data is processed in parallel using RDDs with out any data sharing between each interval.

但是我的用例需要在间隔之间共享数据.

But my use case needs to share the data between intervals.

考虑

Consider the Network WordCount example which produces the count of all words received in that interval.

我将如何产生以下单词计数?

How would I produce following word count ?

  • 单词"hadoop"和"spark"具有相对间隔计数的相对计数

  • Relative count for the words "hadoop" and "spark" with the previous interval count

所有其他单词的正常单词计数.

Normal word count for all other words.

注意:UpdateStateByKey进行有状态处理,但这将功能应用于每个记录而不是特定记录.

Note: UpdateStateByKey does the Stateful processing but this applies function on every record instead of particular records.

因此,UpdateStateByKey不符合此要求.

So, UpdateStateByKey doesn't fit for this requirement.

更新:

考虑以下示例

时间间隔1

输入:

Sample Input with Hadoop and Spark on Hadoop

输出:

hadoop  2
sample  1
input   1
with    1
and 1
spark   1
on  1

间隔2

输入:

Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark

输出:

another 3
hadoop  1
spark   2
and 2
sample  1
input   1
with    1
on  1

说明:

第一个间隔给出所有单词的正常单词计数.

1st interval gives the normal word count of all words.

在第二个时间间隔中出现了3次hadoop,但输出应为1(3-2)

In the 2nd interval hadoop occurred 3 times but the output should be 1 (3-2)

发生了3次火花,但输出应为2(3-1)

Spark occurred 3 times but the output should be 2 (3-1)

对于所有其他单词,应该给出正常的单词计数.

For all other words it should give the normal word count.

因此,在处理第二个间隔数据时,其第一个间隔的字数应为 hadoop spark

So, while processing 2nd Interval data, it should have the 1st interval's word count of hadoop and spark

这是一个带有插图的简单示例.

This is a simple example with illustration.

在实际使用情况下,需要数据共享的字段是RDD元素(RDD)的一部分,并且无需跟踪任何巨大的值.

In actual use case, fields that need data sharing are part of the RDD element(RDD) and huge no of values needs to be tracked.

即,在此示例中,像hadoop和spark这样的关键字将要跟踪近10万个关键字.

i.e, in this example like hadoop and spark keywords nearly 100k keywords to be tracked.

Apache Storm中的类似用例:

风暴中的分布式缓存

暴风雨TransactionalWords

推荐答案

这可以通过记住"接收到的最后一个RDD,并使用左连接将数据与下一个流批处理合并来实现.我们使用streamingContext.remember使流处理过程中产生的RDD保留在我们需要的时间内.

This is possible by "remembering" the last RDD received and using a left join to merge that data with the next streaming batch. We make use of streamingContext.remember to enable RDDs produced by the streaming process to be kept for the time we need them.

我们利用了dstream.transform是在驱动程序上执行的操作的事实,因此我们可以访问所有本地对象定义.特别是,我们希望使用每个批次上的必需值来更新对最后一个RDD的可变引用.

We make use of the fact that dstream.transform is an operation that executes on the driver and therefore we have access to all local object definitions. In particular we want to update the mutable reference to the last RDD with the required value on each batch.

可能有一段代码使这个想法更加清楚:

Probably a piece of code makes that idea more clear:

// configure the streaming context to remember the RDDs produced
// choose at least 2x the time of the streaming interval
ssc.remember(xx Seconds)  

// Initialize the "currentData" with an empty RDD of the expected type
var currentData: RDD[(String, Int)] = sparkContext.emptyRDD

// classic word count
val w1dstream = dstream.map(elem => (elem,1))    
val count = w1dstream.reduceByKey(_ + _)    

// Here's the key to make this work. Look how we update the value of the last RDD after using it. 
val diffCount = count.transform{ rdd => 
                val interestingKeys = Set("hadoop", "spark")               
                val interesting = rdd.filter{case (k,v) => interestingKeys(k)}                                
                val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))}
                currentData = interesting
                countDiff                
               }

diffCount.print()

这篇关于批次之间的Spark流数据共享的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆