如何将行合并为Spark数据框的列作为有效的json以在mysql中写入 [英] how to merge rows into column of spark dataframe as vaild json to write it in mysql
本文介绍了如何将行合并为Spark数据框的列作为有效的json以在mysql中写入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试将多行合并为一列,作为spark数据帧(spark 1.6.1)中的有效json格式.然后我希望将其存储在mysql表中.
I am trying to merge multiple rows into one column as vaild json format in spark dataframe (spark 1.6.1). and then I want it to be stored in mysql table.
我的原点火花数据框如下:
my origin spark dataframe like below:
|user_id |product_id|price |
|A |p1 |3000 |
|A |p2 |1500 |
|B |P1 |3000 |
|B |P3 |2000 |
我想像这样转换上面的表格:
I want to convert above table like this:
|user_id |contents_json
|A |{(product_id:p1, price:3000), (product_id:p2, price:1500)}
|B |{{product_id:p1, price:3000), (product_id:p3, price:2000)}
然后将上面的表放到mysql表中.
and then put above table into mysql table.
这与爆炸的方式完全相反,但我找不到正确的方法.
it is exactly opposite way of explode but I can't find a right way.
推荐答案
我假设您正在寻找下面显示的JSON输出.
I assume you are looking for below shown JSON output.
from pyspark.sql.functions import col, collect_list, struct
df = sc.parallelize([('A','P1',3000), ('A','P2',1500),
('B','P1',3000), ('B','P3',2000)]).toDF(["user_id", "product_id","price"])
> Spark2.0
df1 = df.\
groupBy("user_id").agg(collect_list(struct(col("product_id"),col("price"))).alias("contents_json"))
df1.show()
Spark1.6
zipCols = psf.udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("product_id", StringType()),
StructField("price", IntegerType())
]))
)
df1 = df.\
groupBy("user_id").agg(
zipCols(
collect_list(col("product_id")),
collect_list(col("price"))
).alias("contents_json")
)
for row in df1.toJSON().collect():
print row
输出为:
{"user_id":"B","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P3","price":2000}]}
{"user_id":"A","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P2","price":1500}]}
希望这会有所帮助!
Hope this helps!
这篇关于如何将行合并为Spark数据框的列作为有效的json以在mysql中写入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文