Spark Streaming-基于过滤器Param拆分输入流的最佳方法 [英] Spark Streaming - Best way to Split Input Stream based on filter Param
问题描述
我目前正在尝试创建某种监视解决方案-将一些数据写入kafka,然后使用Spark Streaming读取并处理这些数据.
I currently try to create some kind of monitoring solution - some data is written to kafka and I read this data with Spark Streaming and process it.
为了预处理数据以进行机器学习和异常检测,我想基于一些过滤器参数分割流.到目前为止,我了解到DStreams本身不能拆分为几个流.
For preprocessing the data for machine learning and anomaly detection I would like to split the stream based on some filter Parameters. So far I have learned that DStreams themselves cannot be split into several streams.
我主要面对的问题是,许多算法(例如KMeans)仅采用连续数据,而不采用离散数据(例如). url或其他字符串.
The problem I am mainly facing is that many algorithms(like KMeans) only take continues data and not discrete data like e.g. url or some other String.
理想情况下,我的要求是:
My requirements would ideally be:
- 从kafka读取数据并根据我的读物生成一个字符串列表
- 根据该字符串列表生成多个流-(拆分流,过滤器流或最佳做法)
- 使用这些数据流为每个数据流训练不同的模型以获取基线,然后将随后的所有内容与基线进行比较
我很乐意得到有关如何解决我的问题的任何建议.我无法想象Spark中没有涵盖这种情况-但是直到现在我还没有发现可行的解决方案.
I would be happy to get any suggestions how to approach my problem. I cannot imagine that this scenario is not covered in Spark - however until now I did not discover a working solution.
推荐答案
我认为使用过滤器和映射从原始文件创建派生DStreams应该足够了:
I think it should be sufficient to create derivate DStreams from the original, using filter and map:
val numericFeaturesDStream = originalDStream.filter(e => predicate(e)).map(e => extractFeatures(e))
val otherNumericFeaturesDStream = originalDStream.filter(e => predicate2(e)).map(e => extractOtherFeatures(e))
请注意,这些filter
和map
步骤可以合并为一个collect
步骤(不要与将数据带到驱动程序的无参数RDD.collect混淆!)
Note that these filter
and map
steps could be combined in one collect
step (not to be confused with the parameterless RDD.collect that takes data to the driver!!!)
val featuresStream = originalDStream.transform(rdd =>
rdd.collect{case (a,b,c,d,e) if c=="client" => Vectors.parse(a)}
)
streamingKMeans.trainOn(featuresStream)
我们还可以将一组动态的过滤DStream保存到某个集合中.在这里,我们使用一个包含用于过滤的键的映射:
We can also have a dynamic set of filtered DStreams held into some collection. Here we use a map that contains the key we used to filter:
originalDStream.cache() // important for performance when a DStream is branched out.
// filterKeys: Set[String]
val dstreamByFilterKey = filterKeys.map(key => key -> originalDStream.filter(e => (getKey(e)==key)))
// do something with the different DStreams in this structure ...
这些代码片段是使用实际逻辑完成的代码示例.
These snippets are code examples to be completed with the actual logic.
这篇关于Spark Streaming-基于过滤器Param拆分输入流的最佳方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!