Spark Streaming - 基于过滤器参数拆分输入流的最佳方法 [英] Spark Streaming - Best way to Split Input Stream based on filter Param

查看:45
本文介绍了Spark Streaming - 基于过滤器参数拆分输入流的最佳方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前尝试创建某种监控解决方案 - 一些数据被写入 kafka,我使用 Spark Streaming 读取这些数据并对其进行处理.

I currently try to create some kind of monitoring solution - some data is written to kafka and I read this data with Spark Streaming and process it.

为了预处理机器学习和异常检测的数据,我想根据一些过滤器参数拆分流.到目前为止,我已经了解到 DStreams 本身不能分成几个流.

For preprocessing the data for machine learning and anomaly detection I would like to split the stream based on some filter Parameters. So far I have learned that DStreams themselves cannot be split into several streams.

我主要面临的问题是许多算法(如 KMeans)只采用连续数据而不是离散数据,例如url 或其他一些字符串.

The problem I am mainly facing is that many algorithms(like KMeans) only take continues data and not discrete data like e.g. url or some other String.

我的理想要求是:

  • 从 kafka 读取数据并根据读取的内容生成字符串列表
  • 根据该字符串列表生成多个流 -(拆分流、过滤流或任何最佳实践)
  • 使用这些流为每个流训练不同的模型以获得基线,然后将后面的所有内容与基线进行比较

我很乐意就如何解决我的问题获得任何建议.我无法想象 Spark 没有涵盖这种情况 - 但是直到现在我还没有找到可行的解决方案.

I would be happy to get any suggestions how to approach my problem. I cannot imagine that this scenario is not covered in Spark - however until now I did not discover a working solution.

推荐答案

我认为使用过滤器和映射从原始创建派生 DStream 应该就足够了:

I think it should be sufficient to create derivate DStreams from the original, using filter and map:

val numericFeaturesDStream = originalDStream.filter(e => predicate(e)).map(e => extractFeatures(e))
val otherNumericFeaturesDStream = originalDStream.filter(e => predicate2(e)).map(e => extractOtherFeatures(e))

请注意,这些 filtermap 步骤可以合并为一个 collect 步骤(不要与无参数 RDD.collect 混淆将数据传送给驱动程序!!!)

Note that these filter and map steps could be combined in one collect step (not to be confused with the parameterless RDD.collect that takes data to the driver!!!)

val featuresStream = originalDStream.transform(rdd => 
  rdd.collect{case (a,b,c,d,e) if c=="client" => Vectors.parse(a)}
)
streamingKMeans.trainOn(featuresStream)

我们还可以将一组动态的过滤 DStream 保存到某个集合中.这里我们使用一个包含我们用来过滤的键的映射:

We can also have a dynamic set of filtered DStreams held into some collection. Here we use a map that contains the key we used to filter:

originalDStream.cache() // important for performance when a DStream is branched out.
// filterKeys: Set[String] 
val dstreamByFilterKey = filterKeys.map(key => key -> originalDStream.filter(e => (getKey(e)==key)))
// do something with the different DStreams in this structure ...

这些代码片段是用实际逻辑完成的代码示例.

These snippets are code examples to be completed with the actual logic.

这篇关于Spark Streaming - 基于过滤器参数拆分输入流的最佳方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆