结合Spark Streaming + MLlib [英] Combining Spark Streaming + MLlib

查看:284
本文介绍了结合Spark Streaming + MLlib的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试使用随机森林模型来预测示例流,但是看来我无法使用该模型对示例进行分类. 这是pyspark中使用的代码:

I've tried to use a Random Forest model in order to predict a stream of examples, but it appears that I cannot use that model to classify the examples. Here is the code used in pyspark:

sc = SparkContext(appName="App")

model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, impurity='gini', numTrees=150)


ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(hostname, int(port))

parsedLines = lines.map(parse)
parsedLines.pprint()

predictions = parsedLines.map(lambda event: model.predict(event.features))

以及在群集中进行编译时返回的错误:

and the error returned while compiling it in the cluster:

  Error : "It appears that you are attempting to reference SparkContext from a broadcast "
    Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

是否可以使用从静态数据生成的模型来预测流式传输示例?

is there a way to use a modèle generated from a static data to predict a streaming examples ?

谢谢大家,我真的很感激!!!!

Thanks guys i really appreciate it !!!!

推荐答案

是的,您可以使用从静态数据生成的模型.您遇到的问题根本与流无关.您根本无法在动作或转换中使用基于JVM的模型(请参阅如何从动作或转换中使用Java/Scala函数?解释原因).相反,您应该将predict方法应用于完整的RDD,例如在DStream上使用transform:

Yes, you can use model generated from static data. The problem you experience is not related to streaming at all. You simply cannot use JVM based model inside action or transformations (see How to use Java/Scala function from an action or a transformation? for an explanation why). Instead you should apply predict method to a complete RDD for example using transform on DStream:

from pyspark.mllib.tree import RandomForest
from pyspark.mllib.util import MLUtils
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from operator import attrgetter


sc = SparkContext("local[2]", "foo")
ssc = StreamingContext(sc, 1)

data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
trainingData, testData = data.randomSplit([0.7, 0.3])

model = RandomForest.trainClassifier(
    trainingData, numClasses=2, nmTrees=3
)

(ssc
    .queueStream([testData])
    # Extract features
    .map(attrgetter("features"))
    # Predict 
    .transform(lambda _, rdd: model.predict(rdd))
    .pprint())

ssc.start()
ssc.awaitTerminationOrTimeout(10)

这篇关于结合Spark Streaming + MLlib的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆