如何将行合并到 spark 数据帧的列中作为有效的 json 将其写入 mysql [英] how to merge rows into column of spark dataframe as vaild json to write it in mysql
本文介绍了如何将行合并到 spark 数据帧的列中作为有效的 json 将其写入 mysql的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试将多行合并为一列,作为 spark 数据帧 (spark 1.6.1) 中的有效 json 格式.然后我希望它存储在 mysql 表中.
I am trying to merge multiple rows into one column as vaild json format in spark dataframe (spark 1.6.1). and then I want it to be stored in mysql table.
我的原始火花数据框如下:
my origin spark dataframe like below:
|user_id |product_id|price |
|A |p1 |3000 |
|A |p2 |1500 |
|B |P1 |3000 |
|B |P3 |2000 |
我想像这样转换上面的表格:
I want to convert above table like this:
|user_id |contents_json
|A |{(product_id:p1, price:3000), (product_id:p2, price:1500)}
|B |{{product_id:p1, price:3000), (product_id:p3, price:2000)}
然后把上面的表放到mysql表中.
and then put above table into mysql table.
这是完全相反的爆炸方式,但我找不到正确的方法.
it is exactly opposite way of explode but I can't find a right way.
推荐答案
我假设您正在寻找下面显示的 JSON 输出.
I assume you are looking for below shown JSON output.
from pyspark.sql.functions import col, collect_list, struct
df = sc.parallelize([('A','P1',3000), ('A','P2',1500),
('B','P1',3000), ('B','P3',2000)]).toDF(["user_id", "product_id","price"])
>Spark2.0
df1 = df.\
groupBy("user_id").agg(collect_list(struct(col("product_id"),col("price"))).alias("contents_json"))
df1.show()
Spark1.6
zipCols = psf.udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("product_id", StringType()),
StructField("price", IntegerType())
]))
)
df1 = df.\
groupBy("user_id").agg(
zipCols(
collect_list(col("product_id")),
collect_list(col("price"))
).alias("contents_json")
)
for row in df1.toJSON().collect():
print row
输出为:
{"user_id":"B","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P3","price":2000}]}
{"user_id":"A","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P2","price":1500}]}
这篇关于如何将行合并到 spark 数据帧的列中作为有效的 json 将其写入 mysql的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文