如何将火花流输出转换为数据帧或存储在表中 [英] How to convert spark streaming output into dataframe or storing in table

查看:26
本文介绍了如何将火花流输出转换为数据帧或存储在表中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的代码是:

val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("hello" -> 5))
val data=lines.map(_._2)
data.print()

我的输出有 50 个不同的值,格式如下

My output has 50 different values in a format as below

{"id:st04","data:26-02-2018 20:30:40","temp:30", "press:20"}

谁能帮我将这些数据以表格形式存储为

Can anyone help me in storing this data in a table form as

| id |date               |temp|press|   
|st01|26-02-2018 20:30:40| 30 |20   |  
|st01|26-02-2018 20:30:45| 80 |70   |  

我会非常感激.

推荐答案

你可以使用 foreachRDD 函数,与普通的 Dataset API 一起使用:

You can use foreachRDD function, together with normal Dataset API:

data.foreachRDD(rdd => {
    // rdd is RDD[String]
    // foreachRDD is executed on the  driver, so you can use SparkSession here; spark is SparkSession, for Spark 1.x use SQLContext
    val df = spark.read.json(rdd); // or sqlContext.read.json(rdd)
    df.show(); 
    df.write.saveAsTable("here some unique table ID");
});

但是,如果您使用 Spark 2.x,我建议您使用 Structured Streaming:

However, if you use Spark 2.x, I would suggest to use Structured Streaming:

val stream = spark.readStream.format("kafka").load()
val data = stream
            .selectExpr("cast(value as string) as value")
            .select(from_json(col("value"), schema))
data.writeStream.format("console").start();

您必须手动指定架构,但它非常简单:) 在任何处理之前还要导入 org.apache.spark.sql.functions._

You must manually specify schema, but it's quite simple :) Also import org.apache.spark.sql.functions._ before any processing

这篇关于如何将火花流输出转换为数据帧或存储在表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆