DataFrame 到 RDD[(String, String)] 的转换 [英] DataFrame to RDD[(String, String)] conversion

查看:27
本文介绍了DataFrame 到 RDD[(String, String)] 的转换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将 org.apache.spark.sql.DataFrame 转换为 org.apache.spark.rdd.RDD[(String, String)] 在 Databricks 中. 有人可以帮忙吗?

I want to convert an org.apache.spark.sql.DataFrame to org.apache.spark.rdd.RDD[(String, String)] in Databricks. Can anyone help?

背景(也欢迎更好的解决方案):我有一个 Kafka 流,它(经过一些步骤后)变成了 2 列数据框.我想把它放到 Redis 缓存中,第一列作为键,第二列作为值.

Background (and a better solution is also welcome): I have a Kafka stream which (after some steps) becomes a 2 column data frame. I would like to put this into a Redis cache, first column as a key and second column as a value.

更具体地说输入的类型是这样的:lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint].我尝试放入Redis如下:

More specifically the type of the input is this: lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]. I try to put into Redis as follows:

sc.toRedisKV(lastContacts)(redisConfig)

错误信息如下所示:

notebook:20: error: type mismatch;
 found   : org.apache.spark.sql.DataFrame
    (which expands to)  org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
 required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)

我已经尝试过一些想法(例如函数 .rdd),但没有任何帮助.

I already played around with some ideas (like function .rdd) but none helped.

推荐答案

如果要将行映射到不同的 RDD 元素,可以使用 df.map(row => ...) 将数据帧转换为 RDD.

You can use df.map(row => ...) to convert the dataframe to a RDD if you want to map a row to a different RDD element.

例如:

val df = Seq(("table1",432),
      ("table2",567),
      ("table3",987),
      ("table1",789)).
      toDF("tablename", "Code").toDF()

    df.show()

    +---------+----+
|tablename|Code|
+---------+----+
|   table1| 432|
|   table2| 567|
|   table3| 987|
|   table1| 789|
+---------+----+

    val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]

    OR

    val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd  //Type: RDD[(String,String)]

请参考https:///community.hortonworks.com/questions/106500/error-in-spark-streaming-kafka-integration-structu.html 关于AnalysisException:必须使用 writeStream.start() 执行具有流媒体源的查询

您需要使用 query.awaitTermination() 等待查询终止防止进程在查询处于活动状态时退出.

You need to wait for the termination of the query using query.awaitTermination() To prevent the process from exiting while the query is active.

这篇关于DataFrame 到 RDD[(String, String)] 的转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆