如何通过异步星火应用功能,数据框的子集? [英] How to asynchronously apply function via Spark to subsets of dataframe?

查看:223
本文介绍了如何通过异步星火应用功能,数据框的子集?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经用Python编写的大熊猫一个程序,由列(日期和标签)的2需要一个非常大的数据集(每个第4个月百万行6个月),组,然后应用一个函数每个组行。有在每个分组的行的变量数目 - 从行的少数的任何地方,以数千行。有数以千计的群体每月(标签日期组合)。

我目前的程序使用多,所以它的pretty高效,我认为会很好地反映到星火。我已经与之前的map-reduce的工作,但我有在星火实施这个麻烦。我敢肯定,我错过了一些流水线的概念,但一切我读过似乎把重点放在关键值处理,或通过拆分任意分区,分布式数据集,而不是我想要做的。有一个简单的例子或范例这样做?任何帮助将大大AP preciated。

编辑:
下面是一些伪code表示目前我在做什么:

 读者= pd.read_csv()
池= mp.Pool(进程= 4)
标签= LT;唯一标签的制造&gt名单;
在标签标签:
    日期读卡器= [(reader.label ==标签)]
    日期在日期:
        自由度=读者[(reader.label ==标签)及&放大器; (reader.date ==当日)]
        pool.apply_async(过程,DF,回调= callbackFunc)
pool.close()
pool.join()

当我说是异步的,我的意思是什么类似于pool.apply_async()。


解决方案

至于现在(PySpark 1.5.0)是只看到两个三种选择:


  1. 您可以使用SQL操作和UDF尝试前preSS你的逻辑。不幸的是Python的API不支持UDAFs(用户定义聚合函数),但它仍然是前pressive不够,尤其是窗口功能,涵盖范围广泛的场景。

    到外部数据源的访问可以在几个方面,包括处理:


    • 内UDF访问带有可选记忆化

    • 加载到数据帧,并使用加入操作

    • 使用广播变量


  2. 转换数据帧 PairRDD ,并使用以下的:


    • partitionBy + mapPartitions

    • reduceByKey / aggregateByKey


如果Python是不是一个强大的需求斯卡拉API> 1.5.0支持UDAFs这使这样的事情:

  df.groupBy(some_columns:_ *)AGG(some_udaf)

<醇开始=3>

  • 将每个分区键,使用本地熊猫数据帧分区数据

  • I've written a program in Python and pandas which takes a very large dataset (~4 million rows per month for 6 months), groups it by 2 of the columns (date and a label), and then applies a function to each group of rows. There are a variable number of rows in each grouping - anywhere from a handful of rows to thousands of rows. There are thousands of groups per month (label-date combos).

    My current program uses multiprocessing, so it's pretty efficient, and I thought would map well to Spark. I've worked with map-reduce before, but am having trouble implementing this in Spark. I'm sure I'm missing some concept in the pipelining, but everything I've read appears to focus on key-value processing, or splitting a distributed dataset by arbitrary partitions, rather than what I'm trying to do. Is there a simple example or paradigm for doing this? Any help would be greatly appreciated.

    EDIT: Here's some pseudo-code for what I'm currently doing:

    reader = pd.read_csv()
    pool = mp.Pool(processes=4)
    labels = <list of unique labels>
    for label in labels:
        dates = reader[(reader.label == label)]
        for date in dates:
            df = reader[(reader.label==label) && (reader.date==date)]
            pool.apply_async(process, df, callback=callbackFunc)
    pool.close()
    pool.join()
    

    When I say asynchronous, I mean something analogous to pool.apply_async().

    解决方案

    As for now (PySpark 1.5.0) is see only two three options:

    1. You can try to express your logic using SQL operations and UDFs. Unfortunately Python API doesn't support UDAFs (User Defined Aggregate Functions) but it is still expressive enough, especially with window functions, to cover wide range of scenarios.

      Access to the external data sources can be handled in a couple of ways including:

      • access inside UDF with optional memoization
      • loading to a data frame and using join operation
      • using broadcast variable
    2. Converting data frame to PairRDD and using on of the following:

      • partitionBy + mapPartitions
      • reduceByKey / aggregateByKey

    If Python is not a strong requirement Scala API > 1.5.0 supports UDAFs which enable something like this:

    df.groupBy(some_columns: _*).agg(some_udaf)
    

    1. Partitioning data by key and using local Pandas data frames per partition

    这篇关于如何通过异步星火应用功能,数据框的子集?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆