如何使用PySpark并行运行独立转换? [英] How to run independent transformations in parallel using PySpark?

查看:114
本文介绍了如何使用PySpark并行运行独立转换?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用PySpark在单个RDD上并行运行2个函数,以进行完全独立的转换.有什么方法可以做同样的事情?

I am trying to run 2 functions doing completely independent transformations on a single RDD in parallel using PySpark. What are some methods to do the same?

def doXTransforms(sampleRDD):
    (X transforms)

def doYTransforms(sampleRDD):
    (Y Transforms)

if __name__ == "__main__":
    sc = SparkContext(appName="parallelTransforms")
    sqlContext = SQLContext(sc)
    hive_context = HiveContext(sc)

    rows_rdd = hive_context.sql("select * from tables.X_table")

    p1 = Process(target=doXTransforms , args=(rows_rdd,))
    p1.start()
    p2 = Process(target=doYTransforms, args=(rows_rdd,))  
    p2.start()
    p1.join()
    p2.join()
    sc.stop()

这行不通,我现在知道这行不通. 但是,还有其他方法可以使这项工作吗?具体来说,是否有任何针对python-spark的解决方案?

This does not work and I now understand this will not work. But is there any alternative way to make this work? Specifically are there any python-spark specific solutions?

推荐答案

只需使用线程,并确保群集具有足够的资源来同时处理两个任务.

Just use threads and make sure that cluster have enough resources to process both tasks at the same time.

from threading import Thread
import time

def process(rdd, f):
    def delay(x):
        time.sleep(1)
        return f(x)
    return rdd.map(delay).sum()


rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))

t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2  = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()

可以说这在实践中并不经常有用,但否则应该可以正常工作.

Arguably this is not that often useful in practice but otherwise should work just fine.

您可以进一步使用应用内调度FAIR调度程序和调度程序池一起使用,可以更好地控制执行策略.

You can further use in-application scheduling with FAIR scheduler and scheduler pools for a better control over execution strategy.

您还可以尝试 pyspark-asyncactions (免责声明-该答案的作者也是包的作者),它提供了一组围绕Spark API和concurrent.futures的包装器:

You can also try pyspark-asyncactions (disclaimer - the author of this answer is also the author of the package) which provides a set of wrappers around Spark API and concurrent.futures:

import asyncactions
import concurrent.futures

f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()

[x.result() for x in concurrent.futures.as_completed([f1, f2])]

这篇关于如何使用PySpark并行运行独立转换?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆