Spark Streaming:无状态重叠窗口与保持状态 [英] Spark Streaming: stateless overlapping windows vs. keeping state

查看:170
本文介绍了Spark Streaming:无状态重叠窗口与保持状态的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在使用Spark Streaming处理连续的有限事件会话流时,选择无状态滑动窗口操作(例如reduceByKeyAndWindow)与选择保持状态(例如通过updateStateByKey或新的mapStateByKey)相比有什么考虑因素?

例如,考虑以下情形:

可穿戴设备跟踪健身者进行的体育锻炼 佩戴者.设备会自动检测运动开始的时间, 并发出一条消息;锻炼时发出其他消息 正在经历(例如心律);最后,当 锻炼完成.

期望的结果是每个练习的汇总记录流.也就是说,同一会话的所有事件都应汇总在一起(例如,以便每个会话都可以保存在单个数据库行中).请注意,每个会话的长度是有限的,但是来自多个设备的整个流是连续的.为了方便起见,我们假设设备为每次锻炼生成一个GUID.

我可以看到两种通过Spark Streaming处理此用例的方法:

  1. 使用不重叠的窗口并保持状态.每个GUID都会保存一个状态,所有事件都与之匹配.当一个新事件到达时,状态被更新(例如,使用mapWithState),并且如果该事件是练习结束",则将基于该状态发出汇总记录,并删除键.

  2. 使用重叠的滑动窗口,并仅保留第一个会话.假设滑动窗口的长度为2,间隔为1(请参见下图).还要假设窗口长度为2 X(最大可能的运动时间).在每个窗口上,事件都通过GUID进行汇总,例如使用reduceByKeyAndWindow.然后,将从窗口后半部分开始的所有会话都转储,并发射其余会话.这样一来,每个事件就可以使用一次,并确保将属于同一会话的所有事件汇总到一起.

方法2的图表:

Only sessions starting in the areas marked with \\\ will be emitted. 
-----------
|window 1 |
|\\\\|    |
-----------
     ----------
     |window 2 |
     |\\\\|    |  
     -----------
          ----------
          |window 3 |
          |\\\\|    |
          -----------

我看到的优点和缺点:

方法1的计算开销较小,但需要保存和管理状态(例如,如果并发会话数增加,则状态可能会大于内存).但是,如果最大并发会话数是有界的,那么这可能不是问题.

方法2的费用是每个方法的两倍(每个事件处理两次),并且具有更高的延迟(最大运动时间是2倍),但是由于没有保留任何状态,因此更简单易管理.

处理此用例的最佳方法是什么-这些方法中的任何一种是正确"的,还是有更好的方法?

应考虑哪些其他优点/缺点?

解决方案

通常没有 right 方法,每个方法都需要权衡.因此,我将添加其他方法,并概述我对他们的利弊的看法.因此,您可以决定哪一个更适合您.

外部状态方法(方法3)

您可以在外部存储中累积事件的状态.卡桑德拉(Cassandra)经常用于此.您可以分别处理最终事件和正在进行的事件,如下所示:

 val stream = ...

val ongoingEventsStream = stream.filter(!isFinalEvent)
val finalEventsStream = stream.filter(isFinalEvent)

ongoingEventsStream.foreachRDD { /*accumulate state in casssandra*/ }
finalEventsStream.foreachRDD { /*finalize state in casssandra, move to final destination if needed*/ }
 

trackStateByKey方法(方法#1.1)

这可能是您潜在的最佳解决方案,因为它消除了updateStateByKey的缺点,但是考虑到它只是作为Spark 1.6发行版的一部分发布的,因此它也可能存在风险(由于某种原因,它并不十分广告).如果要查找,可以使用链接作为起点更多

优点/缺点

方法1(updateStateByKey)

专业人士

  • 易于理解(向团队中的其他成员,新来的人等)或对其进行解释(主观)
  • 存储:更好地利用内存只能存储最新的锻炼状态
  • 存储:仅保留正在进行的练习,并在完成后立即将其丢弃
  • 延迟时间仅受每个微批处理性能的限制

缺点

  • 存储:如果键(并行练习)的数量很大,则可能不适合您群集的内存
  • 处理:它将为状态图中的每个键运行updateState函数,因此,如果并发练习的数量很大-性能将会受到损害

方法2(窗口)

虽然可以通过Windows实现所需的功能,但是在您的情况下看起来自然就不那么自然了.

专业人士

    在某些情况下(取决于数据),
  • 处理可能比updateStateByKey更有效,因为即使没有实际更新,updateStateByKey倾向于对每个键运行更新

缺点

  • 最大可能运动时间"-听起来像是巨大的风险-根据人类的行为,它可能是相当随意的持续时间.有些人可能会忘记完成运动".还取决于锻炼的种类,但是当您想降低快速锻炼的潜伏期而又必须将潜伏期保持在可能存在的最长锻炼范围之内时,其范围可能从几秒到几小时不等
  • 喜欢更难向他人解释其工作原理(主观)
  • 存储:必须将所有数据保留在窗口框架内,而不仅仅是最新的数据.也只有在窗口从该时隙滑出时才释放内存,而实际上没有完成锻炼时才释放.如果仅保留最后两个时隙,可能不会有太大的区别-如果您尝试通过更频繁地滑动窗口来获得更大的灵活性,则差异会增加.

方法3(外部状态)

专业人士

  • 易于解释等(主观)
  • 纯流处理方法,意味着spark负责对每个事件进行操作,但不尝试存储状态等(主观)
  • 存储:不受群集存储状态的限制-可以处理大量并发练习
  • 处理:仅当状态有实际更新时才更新状态(与updateStateByKey不同)
  • 延迟与updateStateByKey相似,仅受处理每个微批处理所需的时间限制

缺点

  • 架构中的额外组件(除非您已经将Cassandra用于最终输出)
  • 处理:默认情况下,它比仅在火花中处理(因为不在内存中)要慢,并且您需要通过网络传输数据
  • 您必须完全实现一次语义,才能将数据输出到cassandra中(以防foreachRDD期间的工作程序失败)

建议的方法

我会尝试以下操作:

  • 在数据和集群上测试updateStateByKey方法
  • 查看即使进行大量并发练习(预计在高峰时段),内存消耗和处理是否可以接受
  • 退回Cassandra,以防万一

What would be some considerations for choosing stateless sliding-window operations (e.g. reduceByKeyAndWindow) vs. choosing to keep state (e.g. via updateStateByKey or the new mapStateByKey) when handling a stream of sequential, finite event sessions with Spark Streaming?

For example, consider the following scenario:

A wearable device tracks physical exercises performed by the wearer. The device automatically detects when an exercise starts, and emits a message; emits additional messages while the exercise is undergoing (e.g. heart rate); and finally, emits a message when the exercise is done.

The desired result is a stream of aggregated records per exercise session. i.e. all events of the same session should be aggregated together (e.g. so that each session could be saved in a single DB row). Note that each session has a finite length, but the entire stream from multiple devices is continuous. For convenience, let's assume the device generates a GUID for each exercise session.

I can see two approaches for handling this use-case with Spark Streaming:

  1. Using non-overlapping windows, and keeping state. A state is saved per GUID, with all events matching it. When a new event arrives, the state is updated (e.g. using mapWithState), and in case the event is "end of exercise session", an aggregated record based on the state will be emitted, and the key removed.

  2. Using overlapping sliding windows, and keeping only the first sessions. Assume a sliding window of length 2 and interval 1 (see diagram below). Also assume that the window length is 2 X (maximal possible exercise time). On each window, events are aggreated by GUID, e.g. using reduceByKeyAndWindow. Then, all sessions which started at the second half of the window are dumped, and the remaining sessions emitted. This enables using each event exactly once, and ensures all events belonging to the same session will be aggregated together.

Diagram for approach #2:

Only sessions starting in the areas marked with \\\ will be emitted. 
-----------
|window 1 |
|\\\\|    |
-----------
     ----------
     |window 2 |
     |\\\\|    |  
     -----------
          ----------
          |window 3 |
          |\\\\|    |
          -----------

Pros and cons I see:

Approach #1 is less computationally expensive, but requires saving and managing state (e.g. if the number of concurrent sessions increases, the state might get larger than memory). However if the maximal number of concurrent sessions is bounded, this might not be an issue.

Approach #2 is twice as expensive (each event is processed twice), and with higher latency (2 X maximal exercise time), but more simple and easily manageable, as no state is retained.

What would be the best way to handle this use case - is any of these approaches the "right" one, or are there better ways?

What other pros/cons should be taken into consideration?

解决方案

Normally there is no right approach, each has tradeoffs. Therefore I'd add additional approach to the mix and will outline my take on their pros and cons. So you can decide which one is more suitable for you.

External state approach (approach #3)

You can accumulate state of the events in external storage. Cassandra is quite often used for that. You can handle final and ongoing events separately for example like below:

val stream = ...

val ongoingEventsStream = stream.filter(!isFinalEvent)
val finalEventsStream = stream.filter(isFinalEvent)

ongoingEventsStream.foreachRDD { /*accumulate state in casssandra*/ }
finalEventsStream.foreachRDD { /*finalize state in casssandra, move to final destination if needed*/ }

trackStateByKey approach (approach #1.1)

It might be potentially optimal solution for you as it removes drawbacks of updateStateByKey, but considering it is just got released as part of Spark 1.6 release, it could be risky as well (since for some reason it is not very advertised). You can use the link as starting point if you want to find out more

Pros/Cons

Approach #1 (updateStateByKey)

Pros

  • Easy to understand or explain (to rest of the team, newcomers, etc.) (subjective)
  • Storage: Better usage of memory stores only latest state of exercise
  • Storage: Will keep only ongoing exercises, and discard them as soon as they finish
  • Latency is limited only by performance of each micro-batch processing

Cons

  • Storage: If number of keys (concurrent exercises) is large it may not fit into memory of your cluster
  • Processing: It will run updateState function for each key within the state map, therefore if number of concurrent exercises is large - performance will suffer

Approach #2 (window)

While it is possible to achieve what you need with windows, it looks significantly less natural in your scenario.

Pros

  • Processing in some cases (depending on the data) might be more effective than updateStateByKey, due to updateStateByKey tendency to run update on every key even if there are no actual updates

Cons

  • "maximal possible exercise time" - this sounds like a huge risk - it could be pretty arbitrary duration based on a human behaviour. Some people might forget to "finish exercise". Also depends on kinds of exercise, but could range from seconds to hours, when you want lower latency for quick exercises while would have to keep latency as high as longest exercise potentially could exist
  • Feels like harder to explain to others on how it will work (subjective)
  • Storage: Will have to keep all data within the window frame, not only the latest one. Also will free the memory only when window will slide away from this time slot, not when exercise is actually finished. While it might be not a huge difference if you will keep only last two time slots - it will increase if you try to achieve more flexibility by sliding window more often.

Approach #3 (external state)

Pros

  • Easy to explain, etc. (subjective)
  • Pure streaming processing approach, meaning that spark is responsible to act on each individual event, but not trying to store state, etc. (subjective)
  • Storage: Not limited by memory of the cluster to store state - can handle huge number of concurrent exercises
  • Processing: State is updated only when there are actual updates to it (unlike updateStateByKey)
  • Latency is similar to updateStateByKey and only limited by the time required to process each micro-batch

Cons

  • Extra component in your architecture (unless you already use Cassandra for your final output)
  • Processing: by default is slower than processing just in spark as not in-memory + you need to transfer the data via network
  • you'll have to implement exactly once semantic to output data into cassandra (for the case of worker failure during foreachRDD)

Suggested approach

I'd try the following:

  • test updateStateByKey approach on your data and your cluster
  • see if memory consumption and processing is acceptable even with large number of concurrent exercises (expected on peak hours)
  • fall back to approach with Cassandra in case if not

这篇关于Spark Streaming:无状态重叠窗口与保持状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆