pyspark:ml + 流媒体 [英] pyspark : ml + streaming

查看:25
本文介绍了pyspark:ml + 流媒体的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据 结合 Spark Streaming + MLlib 可以制作一个对 spark 中输入流的预测.

According to Combining Spark Streaming + MLlib it is possible to make a prediction over a stream of input in spark.

给定示例(适用于我的集群)的问题是 testData 是正确格式的给定权限.

The issue with the given example (which works on my cluster) is that the testData is a given right on the correct format.

我正在尝试根据数据字符串设置客户端 <-> 服务器 tcp 交换.我不知道如何以正确的格式转换字符串.

I am trying to set up a client <-> server tcp exchange based on strings of data. I can't figure out how to transform the string on the correct format.

虽然这有效:

sep       = ";"
str_recue = '0.0;0.1;0.2;0.3;0.4;0.5'
rdd       = sc.parallelize([str_recue])
chemin = "hdfs://xx.xx.xx.xx:8020/cart_model_for_cycliste_v2"
model  = DecisionTreeClassificationModel.load(chemin)
# travail sur la string
rdd2      = rdd.map( lambda data   : data.split(sep))
rdd3      = rdd2.map(lambda tableau: [float(x) for x in tableau])
# création df
cols      = ["c1", "c2", "c3", "c4", "c5", "c6"]
fields    = [StructField(x, FloatType(), True) for x in cols]
schema    = StructType(fields) 
df        = spark.createDataFrame(rdd3, schema=schema ) 
# preparation d'une colonne de features
schema    = StructType(fields)
assembler = VectorAssembler()
assembler = assembler.setInputCols(cols)
assembler = assembler.setOutputCol("features")
df2       = assembler.transform(df)
model.transform(df2).show()

给予:

+---+---+---+---+---+---+--------------------+-------------+-----------+----------+
| c1| c2| c3| c4| c5| c6|            features|rawPrediction|probability|prediction|
+---+---+---+---+---+---+--------------------+-------------+-----------+----------+
|0.0|0.1|0.2|0.3|0.4|0.5|[0.0,0.1000000014...| [0.0,3426.0]|  [0.0,1.0]|       1.0|
+---+---+---+---+---+---+--------------------+-------------+-----------+----------+

我不知道如何让它在监听套接字时工作.

I can't figure out how to make it works while listening to a socket.

我有我的服务器:

import socket
import random
import time
port = 12003
ip   = socket.gethostname()
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind((ip, port))
serversocket.listen(1)
(clientsocket, address) = serversocket.accept()
nb_d_envois = 10
tps_attente = 3
for i in range(nb_d_envois):
    time.sleep(tps_attente)
    sep     = ";"
    to_send = '0.0;0.1;0.2;0.3;0.4;0.5'
    print(to_send)
    clientsocket.send(to_send.encode())

将字符串发送到我的 spark Streaming 上下文.接下来做什么 ?这是我的问题.根据:https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/sql_network_wordcount.py 应该可以做一个 [foreach]

which send a string to my spark Streaming context. What to do next ? This is my question. According to : https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/sql_network_wordcount.py it should be possible to do a [foreach]

所以我创建了一个函数:

So I created a function :

def prevoir(time, rdd):
    sep       = ";"
    chemin = "hdfs://54.37.12.49:8020/cart_model_for_cycliste_v2"
    model  = DecisionTreeClassificationModel.load(chemin)
    # travail sur la string
    rdd2      = rdd.map( lambda data   : data.split(sep))
    rdd3      = rdd2.map(lambda tableau: [float(x) for x in tableau])
    # création df
    cols      = ["c1", "c2", "c3", "c4", "c5", "c6"]
    fields    = [StructField(x, FloatType(), True) for x in cols]
    schema    = StructType(fields) 
    df        = spark.createDataFrame(rdd3, schema=schema ) 
    # preparation d'une colonne de features
    schema    = StructType(fields)
    assembler = VectorAssembler()
    assembler = assembler.setInputCols(cols)
    assembler = assembler.setOutputCol("features")
    df2       = assembler.transform(df)
    model.transform(df2).show()

并将其应用于流上下文:

and applied it on a streaming context :

ssc     = StreamingContext(sc, 5)
dstream = ssc.socketTextStream(listen_to_ip, listen_to_port)
dstream.foreachRDD(prevoir)

但什么也没有出现(甚至没有正常的时间信息).也没有错误.

but nothing appears (not even the normal time info). There is no errors neither.

我的疑问是:

  • 该函数没有注册为UDF,所以我怀疑它可以被调用

  • The function is not registered as UDF, so I am suspicious it can be called at all

通过hdfs加载模型当然应该只做一次并作为参数传递

the loading of the model through hdfs should certainly be done only once and passed as a parameter

show"功能在我看来并不是真正分布的(但它在不应用于 'foreachrdd' 时有效... => 也许我应该在 hdfs 上保存一些东西?

the "show" function seems to me not really distributed (but it works when not applied on 'foreachrdd'... => maybe should I saeve something on hdfs ?

欢迎任何帮助...

推荐答案

数据未从服务器发送到流上下文.代码正确.

Data is not being sent from server to streaming context. The code is correct.

这篇关于pyspark:ml + 流媒体的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆