如何将 Spark Streaming 数据转换为 Spark DataFrame [英] How to convert Spark Streaming data into Spark DataFrame

查看:50
本文介绍了如何将 Spark Streaming 数据转换为 Spark DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

到目前为止,Spark 还没有创建 DataFrame 用于流式数据,但是当我在做异常检测时,使用 DataFrame 进行数据分析更方便、更快捷.我已经完成了这部分,但是当我尝试使用流数据进行实时异常检测时,问题出现了.试了好几种方法都不能把DStream转为DataFrame,也不能把DStream里面的RDD转为DataFrame.

So far, Spark hasn't created the DataFrame for streaming data, but when I am doing anomalies detection, it is more convenient and faster to use DataFrame for data analysis. I have done this part, but when I try to do real time anomalies detection using streaming data, the problems appeared. I tried several ways and still could not convert DStream to DataFrame, and cannot convert the RDD inside of DStream into DataFrame either.

这是我最新版本代码的一部分:

Here's part of my latest version of the code:

import sys
import re

from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator


sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

model_inputs = sys.argv[1]

def streamrdd_to_df(srdd):
    sdf = sqlContext.createDataFrame(srdd)
    sdf.show(n=2, truncate=False)
    return sdf

def main():
    indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
    inrdd = indata.map(lambda r: get_tuple(r))
    Features = Row('rawFeatures')
    features_rdd = inrdd.map(lambda r: Features(r))
    features_rdd.pprint(num=3)
    streaming_df = features_rdd.flatMap(streamrdd_to_df)

    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()

正如你在main()函数中看到的,当我使用ssc.socketTextStream()方法读取输入的流数据时,它生成了DStream,然后我尝试将DStream中的每个个体转换为Row,希望可以转换稍后将数据导入 DataFrame.

As you can see in the main() function, when I am reading the input streaming data using ssc.socketTextStream() method, it generates DStream, then I tried to convert each individual in DStream into Row, hoping I could convert the data into DataFrame later.

如果我在这里使用 ppprint() 来打印 features_rdd,它会起作用,这让我想到,features_rdd 中的每个人都是一批 RDD,而整个 features_rdd 是一个 DStream.

If I use ppprint() to print out features_rdd here, it works, which makes me think, each individual in features_rdd is a batch of RDD while the whole features_rdd is a DStream.

然后我创建了streamrdd_to_df()方法并希望将每批RDD转换为数据帧,它给了我错误,显示:

Then I created streamrdd_to_df() method and hoped to convert each batch of RDD into dataframe, it gives me the error, showing:

ERROR StreamingContext:启动上下文时出错,将其标记为已停止java.lang.IllegalArgumentException:要求失败:没有注册输出操作,所以没有执行

ERROR StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

有没有想过如何对 Spark 流数据进行 DataFrame 操作?

Is there any thought about how can I do DataFrame operations on Spark streaming data?

推荐答案

1 年后,我开始探索 Spark 2.0 流方法,最终解决了我的异常检测问题.这是我在 IPython 中的代码,你还可以找到我的原始数据输入是什么样子

After 1 year, I started to explore Spark 2.0 streaming methods and finally solved my anomalies detection problem. Here's my code in IPython, you can also find how does my raw data input look like

这篇关于如何将 Spark Streaming 数据转换为 Spark DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆