Spark Streaming-基于过滤器Param拆分输入流的最佳方法 [英] Spark Streaming - Best way to Split Input Stream based on filter Param

查看:475
本文介绍了Spark Streaming-基于过滤器Param拆分输入流的最佳方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在尝试创建某种监视解决方案-将一些数据写入kafka,然后使用Spark Streaming读取并处理这些数据.

I currently try to create some kind of monitoring solution - some data is written to kafka and I read this data with Spark Streaming and process it.

为了预处理数据以进行机器学习和异常检测,我想基于一些过滤器参数分割流.到目前为止,我了解到DStreams本身不能拆分为几个流.

For preprocessing the data for machine learning and anomaly detection I would like to split the stream based on some filter Parameters. So far I have learned that DStreams themselves cannot be split into several streams.

我主要面对的问题是,许多算法(例如KMeans)仅采用连续数据,而不采用离散数据(例如). url或其他字符串.

The problem I am mainly facing is that many algorithms(like KMeans) only take continues data and not discrete data like e.g. url or some other String.

理想情况下,我的要求是:

My requirements would ideally be:

  • 从kafka读取数据并根据我的读物生成一个字符串列表
  • 根据该字符串列表生成多个流-(拆分流,过滤器流或最佳做法)
  • 使用这些数据流为每个数据流训练不同的模型以获取基线,然后将随后的所有内容与基线进行比较

我很乐意得到有关如何解决我的问题的任何建议.我无法想象Spark中没有涵盖这种情况-但是直到现在我还没有发现可行的解决方案.

I would be happy to get any suggestions how to approach my problem. I cannot imagine that this scenario is not covered in Spark - however until now I did not discover a working solution.

推荐答案

我认为使用过滤器和映射从原始文件创建派生DStreams应该足够了:

I think it should be sufficient to create derivate DStreams from the original, using filter and map:

val numericFeaturesDStream = originalDStream.filter(e => predicate(e)).map(e => extractFeatures(e))
val otherNumericFeaturesDStream = originalDStream.filter(e => predicate2(e)).map(e => extractOtherFeatures(e))

请注意,这些filtermap步骤可以合并为一个collect步骤(不要与将数据带到驱动程序的无参数RDD.collect混淆!)

Note that these filter and map steps could be combined in one collect step (not to be confused with the parameterless RDD.collect that takes data to the driver!!!)

val featuresStream = originalDStream.transform(rdd => 
  rdd.collect{case (a,b,c,d,e) if c=="client" => Vectors.parse(a)}
)
streamingKMeans.trainOn(featuresStream)

我们还可以将一组动态的过滤DStream保存到某个集合中.在这里,我们使用一个包含用于过滤的键的映射:

We can also have a dynamic set of filtered DStreams held into some collection. Here we use a map that contains the key we used to filter:

originalDStream.cache() // important for performance when a DStream is branched out.
// filterKeys: Set[String] 
val dstreamByFilterKey = filterKeys.map(key => key -> originalDStream.filter(e => (getKey(e)==key)))
// do something with the different DStreams in this structure ...

这些代码片段是使用实际逻辑完成的代码示例.

These snippets are code examples to be completed with the actual logic.

这篇关于Spark Streaming-基于过滤器Param拆分输入流的最佳方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆