创建数据帧时输出不当 [英] inappropriate output while creating a dataframe

查看:22
本文介绍了创建数据帧时输出不当的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Scala 应用程序从 kafka 主题流式传输数据.我能够从主题中获取数据,但是如何从中创建数据框?

I'm trying to stream the data from kafka topic using scala application.I'm able to get the data from the topic, but how to create a data frame out of it?

这里是数据(字符串,字符串格式)

Here is the data(in string,string format)

{
  "action": "AppEvent",
  "tenantid": 298,
  "lat": 0.0,
  "lon": 0.0,
  "memberid": 16390,
  "event_name": "CATEGORY_CLICK",
  "productUpccd": 0,
  "device_type": "iPhone",
  "device_os_ver": "10.1",
  "item_name": "CHICKEN"
}

我尝试了几种方法,但没有产生令人满意的结果.

I tried few ways to do it, but it is not yielding satisfactory results.

 +--------------------+ |                  _1|
 +--------------------+ |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...|
 |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...|
 |{"action":"AppEve...| |{"action":"AppEve...|

谁能告诉如何进行映射,以便每个字段都像表一样进入单独的列.数据采用 avro 格式.

Can anyone tell How to do the mapping so that each field goes in to a seperate column like a table. the data is in avro format.

这是从主题中获取数据的代码.

here is the code which is getting the data from the topic.

val ssc = new StreamingContext(sc, Seconds(2))
val kafkaConf = Map[String, String]("metadata.broker.list" -> "####",
     "zookeeper.connect" -> "########",
     "group.id" -> "KafkaConsumer",
     "zookeeper.connection.timeout.ms" -> "1000000")
val topicMaps = Map("fishbowl" -> 1)
val messages  = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER).map(_._2)

请指导我如何使用 foreachRDD func 和 map() 创建一个合适的数据框

please guide me how to use foreachRDD func and map() to create a proper data frame

推荐答案

从一个 rdd 创建一个数据框,而不管它的 case 类模式.使用下面的逻辑

To create a dataframe out of an rdd irrespective of its case class schema. Use this below logic

stream.foreachRDD(
  rdd => {
     val dataFrame = sqlContext.read.json(rdd.map(_._2)) 
dataFrame.show()
        })

这里的流是从 kafkaUtils.createStream() 创建的 rdd

这篇关于创建数据帧时输出不当的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆