PySpark:从数据框创建字典? [英] PySpark: create dict of dicts from dataframe?

查看:382
本文介绍了PySpark:从数据框创建字典?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我具有以下格式的数据,这些数据是从Hive获取的数据帧:

I have data in the following format, which is obtained from Hive into a dataframe:

date, stock, price
1388534400, GOOG, 50
1388534400, FB, 60
1388534400, MSFT, 55
1388620800, GOOG, 52
1388620800, FB, 61
1388620800, MSFT, 55

日期是当天午夜的纪元,我们的数据可以追溯到10年前(8亿多行). 我想买一本字典,如下:

Where date is the epoch for midnight on that day, and we have data going back 10 years or so (800million+ rows). I'd like to get a dictionary as follows:

{
'GOOG':
{
'1388534400': 50,
'1388620800': 52
}

'FB':
{
'1388534400': 60,
'1388620800': 61
}
}

一种幼稚的方法是获取一份独特的股票列表,然后通过仅过滤出每种股票的那些行来获取数据框的子集,但这似乎过于幼稚且效率极低. 可以在Spark中轻松完成吗?目前,我已经可以使用PyHive在本机Python中运行它,但是由于数据量巨大,我宁愿在集群/Spark上完成此操作.

A naive way would be to get a list of unique stocks and then get a subset of the dataframe by filtering out only those rows for each stock but this seems overly naive and horribly inefficient. Can this be done easily in Spark? I've currently got it working in native Python using PyHive, but due to the sheer volume of data, I'd rather have this done on a cluster/Spark.

推荐答案

在spark 2.4中,当汇总每只股票的价值时,可以使用map_from_arrays来构建日期-价值映射.然后,只需使用create_map即可将股票代码符号用作键.本示例使用python 3.4中的ChainMap来构建您所描述的最终dict结构.

In spark 2.4 you can use map_from_arrays to build the date-value maps when aggregating values for each stock. Then it's just a matter of use create_map to use the ticker symbol as a key. This example uses ChainMap from python 3.4 to build the final dict structure as you described.

import json
from collections import ChainMap
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("example") \
    .getOrCreate()

df = spark.createDataFrame([
    (1388534400, "GOOG", 50),
    (1388534400, "FB", 60),
    (1388534400, "MSFT", 55),
    (1388620800, "GOOG", 52),
    (1388620800, "FB", 61),
    (1388620800, "MSFT", 55)]
).toDF("date", "stock", "price")

out = df.groupBy("stock") \
        .agg(
            map_from_arrays(
                collect_list("date"), collect_list("price")).alias("values")) \
        .select(create_map("stock", "values").alias("values")) \
        .rdd.flatMap(lambda x: x) \
        .collect()

print(json.dumps(dict(ChainMap(*out)), indent=4, separators=(',', ': '), sort_keys=True))

哪个给:

{                                                                               
    "FB": {
        "1388534400": 60,
        "1388620800": 61
    },
    "GOOG": {
        "1388534400": 50,
        "1388620800": 52
    },
    "MSFT": {
        "1388534400": 55,
        "1388620800": 55
    }
}

但是,因为您说自己有很多数据,实际上可能并不想在内存中创建此字典,所以最好将其拆分并编写相同的字典结构成用于不同分区的文件.

However, as you say you have a lot of data you probably don't actually want to create this dictionary in memory, so probably you would be better of splitting this up and writing the same dictionary structure into files for different partitions.

我们要做的是将日期截短到给定的月份,并为每个月和每只股票编写单独的文件:

Let's do that by truncating the dates to the given month and writing seperate file for each month and for each stock:

out = df.groupBy(trunc(expr("CAST(date as TIMESTAMP)"), "month").alias("month"), df["stock"]) \
        .agg(
            map_from_arrays(
                collect_list("date"), collect_list("price")).alias("values")) \
        .select("month", "stock", create_map("stock", "values").alias("values"))

out.write.partitionBy("month", "stock").format("json").save("out/prices")

这将为您提供如下结构:

This gives you a structure like the following:

out
└── prices
    ├── _SUCCESS
    └── month=2014-01-01
        ├── stock=FB
        │   └── part-00093-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
        ├── stock=GOOG
        │   └── part-00014-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
        └── stock=MSFT
            └── part-00152-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json

MSFT文件如下所示:

And the MSFT file looks like this:

{"values":{"MSFT":{"1388534400":55,"1388620800":55}}}

虽然值"列名称可能不在您的字典结构中,但我希望这可以说明您可以执行的操作.

While the "values" column name may not be in your dictionary structure, I hope this illustrates what you can do.

这篇关于PySpark:从数据框创建字典?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆