在 Spark Streaming 中合并微批次 [英] Merging micro batches in Spark Streaming

查看:60
本文介绍了在 Spark Streaming 中合并微批次的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

(我对批处理火花知之甚少,但对火花流一无所知)

(I have little knowledge about batch spark, but none on spark streaming)

我有一个 kafka 主题 Kafka[(A,B)->X] 其中 (A,B) 是键(A 和 B 是简单的数字类型),X 是消息类型,比较大(几 Mb​​).抛开输入失败的问题,数据是一个网格:对于A中的每一个a,都会有消息(a,b) 用于 B 中的所有 b.此外,b 是有序的,我认为我们可以假设一个 a 的所有消息都将按照 b 的顺序到达(我知道主题是按这个顺序填充的).

I have a kafka topics Kafka[(A,B)->X] where (A,B) is the key (A and B are simple numeric types) and X is the message type, relatively big (couple of Mb). Putting aside the problem of failure in input, the data is a grid: for each a in A, there will be messages (a,b) for all b in B. Moreover, the b's are ordered and I think that we can assume that all messages for one a will arrive following the b's order (what I know is that the topic is filled in this order).

然后我需要按如下方式处理消息:

Then I need to process the messages as follow:

  1. 对每条消息应用一个(一对)函数(a,b)->x,输出(a,b)->y
  2. 应该对消息应用一个函数aB->Seq[y] where aB = {(a,b) for all b in B} 立>
  1. a (couple of) function is applied on each message (a,b)->x, outputting (a,b)->y
  2. a function should be applied on the messages aB->Seq[y] where aB = {(a,b) for all b in B}

(后来有一个pass,消息需要转置"以在所有a之间进行处理,但这不是这里的问题)

(and later there is a pass where messages need to be "transposed" to be processed across all a's, but that's not the question here)

如何实现从第 1 步到第 2 步的消息合并?

How can I achieved such a merge of messages, from step 1 to step 2?

它看起来像是子键 a 上的 groupby,但据我所知,groupby 方法将应用于每个微批次.我需要的是,对于每个 a,等待所有 b 都被接收到(假设一个简单的计数系统可以工作).再次抛开缺少的 b 和输入数据中的错误.

It looks like a groupby over the sub-key a, but to my understanding the method groupby would be applied per micro-batch. What I need is, for each a, to wait that all b's are received (assume a simple counting system would work). Once again putting aside missing b and error in input data.

在不知情的情况下,我会尝试查看是否可以通过附加到 hdfs 文件来实现这种合并,每个 a 一个.并尝试在这些文件满后触发第二个流进程.IE.当它包含所有 b 时,将文件移动到第 2 步的输入目录.但是:

Without knowledge, I would try to see if such merging could be achieved by appending to a hdfs file, one for each a. And try to trigger a second stream process on those files once full. I.e. when it contains all b, move the file to an input directory for step 2. But:

  1. 我不知道这样的附加是否可以在 hdfs 上实现
  2. 两个 sparkStreamingContext 需要并行运行,每个步骤一个.这看起来是个问题(?).
  3. 我知道通过 hdfs 会破坏 spark(流式传输)的恰好一次"属性

推荐答案

可以创建一个master RDD,通过RDD.union将流产生的micro RDDs合并到master上.类似的东西:

You can create a master RDD, and merge the micro RDDs generated by the stream to the master with RDD.union. Something like:

var masterRDD: RDD[(Long,Long), String] = sc.emptyRDD  // guessing on RDD type

myStream.foreachRDD(rdd => {
  if (! rdd.isEmpty) {
    masterRDD.union(rdd)

    masterRDD.groupBy(...).....
  }
})

您应该花一些时间阅读 也用于检查点,特别是:

You should take some time and read up on checkpointing as well, specifically:

数据检查点 - 将生成的 RDD 保存到可靠的存储中.这在一些结合数据的有状态转换中是必要的跨多个批次.在这样的转换中,生成的 RDD依赖于之前批次的 RDD,这会导致依赖链随着时间不断增加.为了避免这种无界恢复时间增加(与依赖链成正比),有状态转换的中间 RDD 是周期性的检查点到可靠的存储(例如 HDFS)以切断依赖链.

Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increases in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.

这篇关于在 Spark Streaming 中合并微批次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆