如何将行合并到 spark 数据帧的列中作为有效的 json 将其写入 mysql [英] how to merge rows into column of spark dataframe as vaild json to write it in mysql

查看:14
本文介绍了如何将行合并到 spark 数据帧的列中作为有效的 json 将其写入 mysql的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将多行合并为一列,作为 spark 数据帧 (spark 1.6.1) 中的有效 json 格式.然后我希望它存储在 mysql 表中.

I am trying to merge multiple rows into one column as vaild json format in spark dataframe (spark 1.6.1). and then I want it to be stored in mysql table.

我的原始火花数据框如下:

my origin spark dataframe like below:

|user_id   |product_id|price       | 
|A         |p1        |3000        |
|A         |p2        |1500        |
|B         |P1        |3000        |
|B         |P3        |2000        |

我想像这样转换上面的表格:

I want to convert above table like this:

|user_id   |contents_json 
|A         |{(product_id:p1, price:3000), (product_id:p2, price:1500)} 
|B         |{{product_id:p1, price:3000), (product_id:p3, price:2000)} 

然后把上面的表放到mysql表中.

and then put above table into mysql table.

这是完全相反的爆炸方式,但我找不到正确的方法.

it is exactly opposite way of explode but I can't find a right way.

推荐答案

我假设您正在寻找下面显示的 JSON 输出.

I assume you are looking for below shown JSON output.

from pyspark.sql.functions import col, collect_list, struct

df = sc.parallelize([('A','P1',3000), ('A','P2',1500),
                     ('B','P1',3000), ('B','P3',2000)]).toDF(["user_id", "product_id","price"])

>Spark2.0

df1 = df.\
    groupBy("user_id").agg(collect_list(struct(col("product_id"),col("price"))).alias("contents_json"))
df1.show()

Spark1.6

zipCols = psf.udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("product_id", StringType()),
      StructField("price", IntegerType())
  ]))
)

df1 = df.\
    groupBy("user_id").agg(
        zipCols(
            collect_list(col("product_id")), 
            collect_list(col("price"))
        ).alias("contents_json")
    )


for row in df1.toJSON().collect():
    print row

输出为:

{"user_id":"B","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P3","price":2000}]}
{"user_id":"A","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P2","price":1500}]}

这篇关于如何将行合并到 spark 数据帧的列中作为有效的 json 将其写入 mysql的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆