如何将分组数据存储到pyspark中的json中 [英] how to store grouped data into json in pyspark

查看:257
本文介绍了如何将分组数据存储到pyspark中的json中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是pyspark的新手

I am new to pyspark

我有一个看起来像的数据集(只是几列的快照)

I have a dataset which looks like (just a snapshot of few columns)

我想按密钥对数据进行分组.我的钥匙是

I want to group my data by key. My key is

CONCAT(a.div_nbr,a.cust_nbr)

我的最终目标是将数据转换为JSON格式

My ultimate goal is to convert the data into JSON, formated like this

k1[{v1,v2,....},{v1,v2,....}], k2[{v1,v2,....},{v1,v2,....}],....

例如

248138339 [{ PRECIMA_ID:SCP 00248 0000138339, PROD_NBR:5553505, PROD_DESC:Shot and a Beer Battered Onion Rings (5553505 and 9285840) , PROD_BRND:Molly's Kitchen,PACK_SIZE:4/2.5 LB, QTY_UOM:CA } , 
        { PRECIMA_ID:SCP 00248 0000138339 , PROD_NBR:6659079 , PROD_DESC:Beef Chuck Short Rib Slices, PROD_BRND:Stockyards , PACK_SIZE:12 LBA , QTY_UOM:CA} ,{...,...,} ],

1384611034793 [{},{},{}],....

1384611034793[{},{},{}],....

我已经创建了一个数据框(我基本上是在联接两个表以获取更多字段)

I have created a dataframe (I am joining two tables basically to get some more fields)

joinstmt = sqlContext.sql(
          "SELECT a.precima_id , CONCAT(a.div_nbr,a.cust_nbr) as
                  key,a.prod_nbr , a.prod_desc,a.prod_brnd ,      a.pack_size , a.qty_uom , a.sales_opp , a.prc_guidance , a.pim_mrch_ctgry_desc , a.pim_mrch_ctgry_id , b.start_date,b.end_date 

FROM scoop_dtl a加入scoop_hdr b on(a.precima_id = b.precima_id))

FROM scoop_dtl a join scoop_hdr b on (a.precima_id =b.precima_id)")

现在,为了获得上述结果,我需要根据键将结果分组,我做了以下

Now, in order to get the above result I need to group by the result based on key, I did the following

groupbydf = joinstmt.groupBy("key")

这导致intp分组了数据,读取后我知道无法直接使用它,我需要将其转换回数据帧以进行存储.

This resulted intp a grouped data and after reading I got to know that I cannot use it directly and I need to convert it back into dataframes to store it.

我是新手,需要一些帮助才能将其转换回数据帧,如果还有其他方法,我将不胜感激.

I am new to it, need some help inorder to convert it back into dataframes or I would appreciate if there are any other ways as well.

推荐答案

如果您加入的数据框看起来像这样:

If your joined dataframe looks like this:

gender  age
M   5
F   50
M   10
M   10
F   10

然后您可以使用以下代码获取所需的输出

You can then use below code to get desired output

joinedDF.groupBy("gender") \ 
    .agg(collect_list("age").alias("ages")) \
    .write.json("jsonOutput.txt")

输出如下所示:

{"gender":"F","ages":[50,10]}
{"gender":"M","ages":[5,10,10]}

如果您有多个列,例如姓名,薪水.您可以添加如下所示的列:

In case you have multiple columns like name, salary. You can add columns like below:

df.groupBy("gender")
    .agg(collect_list("age").alias("ages"),collect_list("name").alias("names"))

您的输出将如下所示:

{"gender":"F","ages":[50,10],"names":["ankit","abhay"]}
{"gender":"M","ages":[5,10,10],"names":["snchit","mohit","rohit"]}

这篇关于如何将分组数据存储到pyspark中的json中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆